Jiayi-Liao commented on a change in pull request #856: URL: https://github.com/apache/incubator-iceberg/pull/856#discussion_r426222173
########## File path: flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergCommitter.java ########## @@ -0,0 +1,633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.connector.IcebergConnectorConstant; +import org.apache.iceberg.flink.connector.model.CommitMetadata; +import org.apache.iceberg.flink.connector.model.CommitMetadataUtil; +import org.apache.iceberg.flink.connector.model.FlinkManifestFile; +import org.apache.iceberg.flink.connector.model.FlinkManifestFileUtil; +import org.apache.iceberg.flink.connector.model.GenericFlinkManifestFile; +import org.apache.iceberg.flink.connector.model.ManifestFileState; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This operator commit data files to Iceberg table. + * <p> + * This operator should always run with parallelism of 1. + * Because Iceberg lib perform optimistic concurrency control, + * this can help reduce contention and retries + * when committing files to Iceberg table. + * <p> + */ +@SuppressWarnings("checkstyle:HiddenField") +public class IcebergCommitter extends RichSinkFunction<FlinkDataFile> + implements CheckpointedFunction, CheckpointListener { + private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class); + + private static final String COMMIT_MANIFEST_HASHES_KEY = "flink.commit.manifest.hashes"; + private static final String WATERMARK_PROP_KEY_PREFIX = "flink.watermark"; + + private Configuration config; + private final String namespace; + private final String tableName; + + private final boolean watermarkEnabled; + private final String watermarkPropKey; + private final long snapshotRetentionHours; + private final boolean commitRestoredManifestFiles; + private final String icebergManifestFileDir; + private final PartitionSpec spec; + private final FileIO io; + private final String flinkJobId; + + private transient Table table; + private transient List<FlinkDataFile> pendingDataFiles; + private transient List<FlinkManifestFile> flinkManifestFiles; + private transient ListState<ManifestFileState> manifestFileState; + private transient CommitMetadata metadata; + private transient ListState<CommitMetadata> commitMetadataState; + + public IcebergCommitter(Table table, Configuration config) { + this.config = config; + + // current Iceberg sink implementation can't work with concurrent checkpoints. + // We disable concurrent checkpoints by default as min pause is set to 60s by default. + // Add an assertion to fail explicit in case job enables concurrent checkpoints. + CheckpointConfig checkpointConfig = StreamExecutionEnvironment.getExecutionEnvironment().getCheckpointConfig(); + if (checkpointConfig.getMaxConcurrentCheckpoints() > 1) { + throw new IllegalArgumentException("Iceberg sink doesn't support concurrent checkpoints"); + } + + namespace = config.getString(IcebergConnectorConstant.NAMESPACE, ""); + tableName = config.getString(IcebergConnectorConstant.TABLE, ""); + + watermarkEnabled = !Strings.isNullOrEmpty( + config.getString(IcebergConnectorConstant.WATERMARK_TIMESTAMP_FIELD, "")); + watermarkPropKey = WATERMARK_PROP_KEY_PREFIX; + snapshotRetentionHours = config.getLong(IcebergConnectorConstant.SNAPSHOT_RETENTION_HOURS, + IcebergConnectorConstant.DEFAULT_SNAPSHOT_RETENTION_HOURS); + commitRestoredManifestFiles = config.getBoolean(IcebergConnectorConstant.COMMIT_RESTORED_MANIFEST_FILES, + IcebergConnectorConstant.DEFAULT_COMMIT_RESTORED_MANIFEST_FILES); + icebergManifestFileDir = getIcebergManifestFileDir(config); + + // The only final fields yielded by table inputted + spec = table.spec(); + io = table.io(); + + final JobExecutionResult jobExecutionResult + = ExecutionEnvironment.getExecutionEnvironment().getLastJobExecutionResult(); + if (jobExecutionResult != null) { + flinkJobId = jobExecutionResult.getJobID().toString(); + LOG.info("Get Flink job ID from execution environment: {}", flinkJobId); + } else { + flinkJobId = new JobID().toString(); + LOG.info("Execution environment doesn't have executed job. Generate a random job ID : {}", flinkJobId); + } + LOG.info("Iceberg committer {}.{} created with sink config", namespace, tableName); + LOG.info("Iceberg committer {}.{} loaded table partition spec: {}", namespace, tableName, spec); + } + + @VisibleForTesting + List<FlinkDataFile> getPendingDataFiles() { + return pendingDataFiles; + } + + @VisibleForTesting + List<FlinkManifestFile> getFlinkManifestFiles() { + return flinkManifestFiles; + } + + @VisibleForTesting + CommitMetadata getMetadata() { + return metadata; + } + + private String getIcebergManifestFileDir(Configuration config) { + final String checkpointDir = config.getString( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, null); + if (null == checkpointDir) { + throw new IllegalArgumentException("checkpoint dir is null"); + } + + return String.format("%s/iceberg/manifest/", checkpointDir); + } + + @Override + public void close() throws Exception { + super.close(); + } + + void init() { + // TODO: duplicate logic, to extract + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + String catalogType = config.getString(IcebergConnectorConstant.CATALOG_TYPE, + IcebergConnectorConstant.CATALOG_TYPE_DEFAULT); + Catalog catalog = null; + switch (catalogType.toUpperCase()) { + case IcebergConnectorConstant.HIVE_CATALOG: + hadoopConf.set(ConfVars.METASTOREURIS.varname, config.getString(ConfVars.METASTOREURIS.varname, "")); + catalog = HiveCatalogs.loadCatalog(hadoopConf); + break; + + case IcebergConnectorConstant.HADOOP_CATALOG: + catalog = new HadoopCatalog(hadoopConf, + config.getString(IcebergConnectorConstant.HADOOP_CATALOG_WAREHOUSE_LOCATION, "")); + break; + + default: + throw new UnsupportedOperationException("Unknown catalog type or not set: " + catalogType); + } + + this.table = catalog.loadTable(TableIdentifier.parse(namespace + "." + tableName)); + + pendingDataFiles = new ArrayList<>(); + flinkManifestFiles = new ArrayList<>(); + metadata = CommitMetadata.newBuilder() + .setLastCheckpointId(0) + .setLastCheckpointTimestamp(0) + .setLastCommitTimestamp(System.currentTimeMillis()) Review comment: How about making these attributes as the default value in CommitMetadata? Otherwise it looks a bit weird because there is no clues that why you set these values manually. ########## File path: flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergCommitter.java ########## @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.connector.IcebergConnectorConstant; +import org.apache.iceberg.flink.connector.model.CommitMetadata; +import org.apache.iceberg.flink.connector.model.CommitMetadataUtil; +import org.apache.iceberg.flink.connector.model.FlinkManifestFile; +import org.apache.iceberg.flink.connector.model.FlinkManifestFileUtil; +import org.apache.iceberg.flink.connector.model.GenericFlinkManifestFile; +import org.apache.iceberg.flink.connector.model.ManifestFileState; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This operator commit data files to Iceberg table. + * <p> + * This operator should always run with parallelism of 1. + * Because Iceberg lib perform optimistic concurrency control, + * this can help reduce contention and retries + * when committing files to Iceberg table. + * <p> + * Here are some known contentions + * 1) Flink jobs running in multiple regions, + * since Iceberg metadata service and commit only happens in us-east-1. + * 2) auto tuning and auto lift services may update Iceberg table infrequently. + */ +@SuppressWarnings("checkstyle:HiddenField") +public class IcebergCommitter extends RichSinkFunction<FlinkDataFile> + implements CheckpointedFunction, CheckpointListener { + private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class); + + private static final String COMMIT_REGION_KEY = "flink.commit.region"; + private static final String COMMIT_MANIFEST_HASHES_KEY = "flink.commit.manifest.hashes"; + private static final String VTTS_WATERMARK_PROP_KEY_PREFIX = "flink.watermark."; + + private Configuration config; + private final String namespace; + private final String tableName; + + private final String region; + private final boolean vttsWatermarkEnabled; + private final String vttsWatermarkPropKey; + private final long snapshotRetentionHours; + private final boolean commitRestoredManifestFiles; + private final String icebergManifestFileDir; + private final PartitionSpec spec; + private final FileIO io; + private final String flinkJobId; + + private transient Table table; + private transient List<FlinkDataFile> pendingDataFiles; + private transient List<FlinkManifestFile> flinkManifestFiles; + private transient ListState<ManifestFileState> manifestFilesState; + private transient CommitMetadata metadata; + private transient ListState<CommitMetadata> metadataState; + + public IcebergCommitter(Table table, Configuration config) { + this.config = config; + + // current Iceberg sink implementation can't work with concurrent checkpoints. + // We disable concurrent checkpoints by default as min pause is set to 60s by default. + // Add an assertion to fail explicit in case job enables concurrent checkpoints. + CheckpointConfig checkpointConfig = StreamExecutionEnvironment.getExecutionEnvironment().getCheckpointConfig(); + if (checkpointConfig.getMaxConcurrentCheckpoints() > 1) { + throw new IllegalArgumentException("Iceberg sink doesn't support concurrent checkpoints"); + } + + namespace = config.getString(IcebergConnectorConstant.NAMESPACE, ""); + tableName = config.getString(IcebergConnectorConstant.TABLE, ""); + + String region = System.getenv("EC2_REGION"); + if (Strings.isNullOrEmpty(region)) { + region = "us-east-1"; + LOG.info("Iceberg committer {}.{} default region to us-east-1", namespace, tableName); + } + this.region = region; + + vttsWatermarkEnabled = !Strings.isNullOrEmpty( + config.getString(IcebergConnectorConstant.VTTS_WATERMARK_TIMESTAMP_FIELD, "")); + vttsWatermarkPropKey = VTTS_WATERMARK_PROP_KEY_PREFIX + region; + snapshotRetentionHours = config.getLong(IcebergConnectorConstant.SNAPSHOT_RETENTION_HOURS, + IcebergConnectorConstant.DEFAULT_SNAPSHOT_RETENTION_HOURS); + commitRestoredManifestFiles = config.getBoolean(IcebergConnectorConstant.COMMIT_RESTORED_MANIFEST_FILES, + IcebergConnectorConstant.DEFAULT_COMMIT_RESTORED_MANIFEST_FILES); + icebergManifestFileDir = getIcebergManifestFileDir(config); + + // The only final fields yielded by table inputted + spec = table.spec(); + io = table.io(); + + final JobExecutionResult jobExecutionResult + = ExecutionEnvironment.getExecutionEnvironment().getLastJobExecutionResult(); + if (jobExecutionResult != null) { + flinkJobId = jobExecutionResult.getJobID().toString(); + LOG.info("Get Flink job ID from execution environment: {}", flinkJobId); + } else { + flinkJobId = new JobID().toString(); + LOG.info("Execution environment doesn't have executed job. Generate a random job ID : {}", flinkJobId); + } + LOG.info("Iceberg committer {}.{} created with sink config", namespace, tableName); + LOG.info("Iceberg committer {}.{} loaded table partition spec: {}", namespace, tableName, spec); + } + + @VisibleForTesting + List<FlinkDataFile> getPendingDataFiles() { + return pendingDataFiles; + } + + @VisibleForTesting + List<FlinkManifestFile> getFlinkManifestFiles() { + return flinkManifestFiles; + } + + @VisibleForTesting + CommitMetadata getMetadata() { + return metadata; + } + + private String getIcebergManifestFileDir(Configuration config) { + final String checkpointDir = config.getString( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, null); + if (null == checkpointDir) { + throw new IllegalArgumentException("checkpoint dir is null"); + } + + return String.format("%s/iceberg/manifest/", checkpointDir); + } + + @Override + public void close() throws Exception { + super.close(); + } + + void init() { + // TODO: duplicate logic, to extract + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + String catalogType = config.getString(IcebergConnectorConstant.CATALOG_TYPE, + IcebergConnectorConstant.CATALOG_TYPE_DEFAULT); + Catalog catalog = null; + switch (catalogType.toUpperCase()) { + case IcebergConnectorConstant.HIVE_CATALOG: + hadoopConf.set(ConfVars.METASTOREURIS.varname, config.getString(ConfVars.METASTOREURIS.varname, "")); + catalog = HiveCatalogs.loadCatalog(hadoopConf); + break; + + case IcebergConnectorConstant.HADOOP_CATALOG: + catalog = new HadoopCatalog(hadoopConf, + config.getString(IcebergConnectorConstant.HADOOP_CATALOG_WAREHOUSE_LOCATION, "")); + break; + + default: + throw new UnsupportedOperationException("Unknown catalog type or not set: " + catalogType); + } + + this.table = catalog.loadTable(TableIdentifier.parse(namespace + "." + tableName)); + + pendingDataFiles = new ArrayList<>(); + flinkManifestFiles = new ArrayList<>(); + metadata = CommitMetadata.newBuilder() + .setLastCheckpointId(0) + .setLastCheckpointTimestamp(0) + .setLastCommitTimestamp(System.currentTimeMillis()) + .build(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + init(); + + Preconditions.checkState(manifestFilesState == null, + "checkpointedFilesState has already been initialized."); + Preconditions.checkState(metadataState == null, + "metadataState has already been initialized."); + manifestFilesState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>( + "iceberg-committer-manifest-files-state", ManifestFileState.class)); + metadataState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>( + "iceberg-committer-metadata-state", CommitMetadata.class)); + + if (context.isRestored()) { + final Iterable<CommitMetadata> restoredMetadata = metadataState.get(); + if (null != restoredMetadata) { + LOG.info("Iceberg committer {}.{} restoring metadata", namespace, tableName); + List<CommitMetadata> metadataList = new ArrayList<>(); + for (CommitMetadata entry : restoredMetadata) { + metadataList.add(entry); + } + Preconditions.checkState(1 == metadataList.size(), + "metadata list size should be 1. got " + metadataList.size()); + metadata = metadataList.get(0); + LOG.info("Iceberg committer {}.{} restored metadata: {}", + namespace, tableName, CommitMetadataUtil.getInstance().encodeAsJson(metadata)); + } else { + LOG.info("Iceberg committer {}.{} has nothing to restore for metadata", namespace, tableName); + } + + Iterable<ManifestFileState> restoredManifestFileStates = manifestFilesState.get(); + if (null != restoredManifestFileStates) { + LOG.info("Iceberg committer {}.{} restoring manifest files", + namespace, tableName); + for (ManifestFileState manifestFileState : restoredManifestFileStates) { + flinkManifestFiles.add(GenericFlinkManifestFile.fromState(manifestFileState)); + } + LOG.info("Iceberg committer {}.{} restored {} manifest files: {}", + namespace, tableName, flinkManifestFiles.size(), flinkManifestFiles); + final long now = System.currentTimeMillis(); + if (now - metadata.getLastCheckpointTimestamp() > TimeUnit.HOURS.toMillis(snapshotRetentionHours)) { + flinkManifestFiles.clear(); + LOG.info("Iceberg committer {}.{} cleared restored manifest files as checkpoint timestamp is too old: " + + "checkpointTimestamp = {}, now = {}, snapshotRetentionHours = {}", + namespace, tableName, metadata.getLastCheckpointTimestamp(), now, snapshotRetentionHours); + } else { + flinkManifestFiles = removeCommittedManifests(flinkManifestFiles); + if (flinkManifestFiles.isEmpty()) { + LOG.info("Iceberg committer {}.{} has zero uncommitted manifest files from restored state", + namespace, tableName); + } else { + if (commitRestoredManifestFiles) { + commitRestoredManifestFiles(); + } else { + LOG.info("skip commit of restored manifest files"); + } + } + } + } else { + LOG.info("Iceberg committer {}.{} has nothing to restore for manifest files", namespace, tableName); + } + } + } + + @VisibleForTesting + void commitRestoredManifestFiles() throws Exception { + LOG.info("Iceberg committer {}.{} committing last uncompleted transaction upon recovery: " + + "metadata = {}, flink manifest files ({}) = {}", namespace, tableName, + CommitMetadataUtil.getInstance().encodeAsJson(metadata), + flinkManifestFiles.size(), flinkManifestFiles); + commit(); + LOG.info("Iceberg committer {}.{} committed last uncompleted transaction upon recovery: " + + "metadata = {}, flink manifest files ({}) = {}", namespace, tableName, + CommitMetadataUtil.getInstance().encodeAsJson(metadata), + flinkManifestFiles.size(), flinkManifestFiles); + postCommitSuccess(); + } + + private List<FlinkManifestFile> removeCommittedManifests(List<FlinkManifestFile> flinkManifestFiles) { + int snapshotCount = 0; + String result = "succeeded"; + final long start = System.currentTimeMillis(); + try { + final Set<String> manifestHashes = flinkManifestFiles.stream() + .map(f -> f.hash()) + .collect(Collectors.toSet()); + final Set<String> committedHashes = new HashSet<>(flinkManifestFiles.size()); + final Iterable<Snapshot> snapshots = table.snapshots(); + for (Snapshot snapshot : snapshots) { + ++snapshotCount; + final Map<String, String> summary = snapshot.summary(); + final List<String> hashes = FlinkManifestFileUtil.hashesStringToList(summary.get(COMMIT_MANIFEST_HASHES_KEY)); + for (String hash : hashes) { + if (manifestHashes.contains(hash)) { + committedHashes.add(hash); + } + } + } + final List<FlinkManifestFile> uncommittedManifestFiles = flinkManifestFiles.stream() + .filter(f -> !committedHashes.contains(f.hash())) + .collect(Collectors.toList()); + return uncommittedManifestFiles; + } catch (Throwable t) { + result = "failed"; + //LOG.error(String.format("Iceberg committer %s.%s failed to check transaction completed", database, tableName), + // t); + LOG.error("Iceberg committer {}.{} failed to check transaction completed. Throwable = {}", + namespace, tableName, t); + throw t; + } finally { + final long duration = System.currentTimeMillis() - start; + LOG.info("Iceberg committer {}.{} {} to check transaction completed" + + " after iterating {} snapshots and {} milli-seconds", + namespace, tableName, result, snapshotCount, duration); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + LOG.info("Iceberg committer {}.{} snapshot state: checkpointId = {}, triggerTime = {}", + namespace, tableName, context.getCheckpointId(), context.getCheckpointTimestamp()); + Preconditions.checkState(null != manifestFilesState, + "manifest files state has not been properly initialized."); + Preconditions.checkState(null != metadataState, + "metadata state has not been properly initialized."); + + // set transaction to null to indicate a start of a new checkpoint/commit/transaction + synchronized (this) { + snapshot(context, pendingDataFiles); + checkpointState(flinkManifestFiles, metadata); + postSnapshotSuccess(); + } + } + + @VisibleForTesting + void snapshot(FunctionSnapshotContext context, List<FlinkDataFile> pendingDataFiles) throws Exception { + FlinkManifestFile flinkManifestFile = null; + if (!pendingDataFiles.isEmpty()) { + flinkManifestFile = createManifestFile(context, pendingDataFiles); + flinkManifestFiles.add(flinkManifestFile); + } + metadata = updateMetadata(metadata, context, flinkManifestFile); + } + + private FlinkManifestFile createManifestFile( + FunctionSnapshotContext context, List<FlinkDataFile> pendingDataFiles) throws Exception { + LOG.info("Iceberg committer {}.{} checkpointing {} pending data files}", + namespace, tableName, pendingDataFiles.size()); + String result = "succeeded"; + final long start = System.currentTimeMillis(); + try { + final String manifestFileName = Joiner.on("_") + .join(flinkJobId, context.getCheckpointId(), context.getCheckpointTimestamp()); + // Iceberg requires file format suffix right now + final String manifestFileNameWithSuffix = manifestFileName + ".avro"; + OutputFile outputFile = io.newOutputFile(icebergManifestFileDir + manifestFileNameWithSuffix); + ManifestWriter manifestWriter = ManifestWriter.write(spec, outputFile); + + // stats + long recordCount = 0; + long byteCount = 0; + long lowWatermark = Long.MAX_VALUE; + long highWatermark = Long.MIN_VALUE; + for (FlinkDataFile flinkDataFile : pendingDataFiles) { + DataFile dataFile = flinkDataFile.getIcebergDataFile(); + manifestWriter.add(dataFile); + // update stas + recordCount += dataFile.recordCount(); + byteCount += dataFile.fileSizeInBytes(); + if (flinkDataFile.getLowWatermark() < lowWatermark) { + lowWatermark = flinkDataFile.getLowWatermark(); + } + if (flinkDataFile.getHighWatermark() > highWatermark) { + highWatermark = flinkDataFile.getHighWatermark(); + } + LOG.debug("Data file with size of {} bytes added to manifest", dataFile.fileSizeInBytes()); + } + manifestWriter.close(); + ManifestFile manifestFile = manifestWriter.toManifestFile(); + + FlinkManifestFile flinkManifestFile = GenericFlinkManifestFile.builder() + .setPath(manifestFile.path()) + .setLength(manifestFile.length()) + .setSpecId(manifestFile.partitionSpecId()) + .setCheckpointId(context.getCheckpointId()) + .setCheckpointTimestamp(context.getCheckpointTimestamp()) + .setDataFileCount(pendingDataFiles.size()) + .setRecordCount(recordCount) + .setByteCount(byteCount) + .setLowWatermark(lowWatermark) + .setHighWatermark(highWatermark) + .build(); + + // don't want to log a giant list at one line. + // split the complete list into smaller chunks with 50 files. + final AtomicInteger counter = new AtomicInteger(0); + Collection<List<String>> listOfFileList = pendingDataFiles.stream() + .map(flinkDataFile -> flinkDataFile.toCompactDump()) + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 50)) + .values(); + for (List<String> fileList : listOfFileList) { + LOG.info("Iceberg committer {}.{} created manifest file {} for {}/{} data files: {}", + namespace, tableName, manifestFile.path(), fileList.size(), pendingDataFiles.size(), fileList); + } + return flinkManifestFile; + } catch (Throwable t) { + result = "failed"; + //LOG.error(String.format("Iceberg committer %s.%s failed to create manifest file for %d pending data files", + // database, tableName, pendingDataFiles.size()), t); + LOG.error("Iceberg committer {}.{} failed to create manifest file for {} pending data files. Throwable={}", + namespace, tableName, pendingDataFiles.size(), t); + throw t; + } finally { + final long duration = System.currentTimeMillis() - start; + LOG.info("Iceberg committer {}.{} {} to create manifest file with {} data files after {} milli-seconds", + namespace, tableName, result, pendingDataFiles.size(), duration); + } + } + + private CommitMetadata updateMetadata( + CommitMetadata oldMetadata, + FunctionSnapshotContext context, + @Nullable FlinkManifestFile flinkManifestFile) { + LOG.info("Iceberg committer {}.{} updating metadata {} with manifest file {}", + namespace, tableName, CommitMetadataUtil.getInstance().encodeAsJson(oldMetadata), flinkManifestFile); + CommitMetadata.Builder metadataBuilder = CommitMetadata.newBuilder(oldMetadata) + .setLastCheckpointId(context.getCheckpointId()) + .setLastCheckpointTimestamp(context.getCheckpointTimestamp()); + if (vttsWatermarkEnabled) { + Long vttsWatermark = oldMetadata.getVttsWatermark(); + if (null == flinkManifestFile) { + // DPS-412: when there is no data to be committed + // use elapsed wall clock time to move the VTTS watermark forward. + if (null != vttsWatermark) { + final long elapsedTimeMs = System.currentTimeMillis() - oldMetadata.getLastCommitTimestamp(); + vttsWatermark += elapsedTimeMs; + } else { + vttsWatermark = System.currentTimeMillis(); + } + } else { + // use lowWatermark. + if (null == flinkManifestFile.lowWatermark()) { + throw new IllegalArgumentException("VTTS is enabled but lowWatermark is null"); + } + // in case one container/slot is lagging behind, + // we want to move watermark forward based on the slowest. + final long newWatermark = flinkManifestFile.lowWatermark(); + // make sure VTTS watermark doesn't go back in time + if (null == vttsWatermark || newWatermark > vttsWatermark) { + vttsWatermark = newWatermark; + } + } + metadataBuilder.setVttsWatermark(vttsWatermark); + } + CommitMetadata metadata = metadataBuilder.build(); + LOG.info("Iceberg committer {}.{} updated metadata {} with manifest file {}", + namespace, tableName, CommitMetadataUtil.getInstance().encodeAsJson(metadata), flinkManifestFile); + return metadata; + } + + private void checkpointState(List<FlinkManifestFile> flinkManifestFiles, CommitMetadata metadata) throws Exception { + LOG.info("Iceberg committer {}.{} checkpointing state", namespace, tableName); + List<ManifestFileState> manifestFileStates = flinkManifestFiles.stream() + .map(f -> f.toState()) + .collect(Collectors.toList()); + manifestFilesState.clear(); + manifestFilesState.addAll(manifestFileStates); + metadataState.clear(); + metadataState.add(metadata); + LOG.info("Iceberg committer {}.{} checkpointed state: metadata = {}, flinkManifestFiles({}) = {}", + namespace, tableName, CommitMetadataUtil.getInstance().encodeAsJson(metadata), + flinkManifestFiles.size(), flinkManifestFiles); + } + + private void postSnapshotSuccess() { + pendingDataFiles.clear(); + LOG.debug("Un-committed manifest file count {}, containing data file count {}, record count {} and byte count {}", + flinkManifestFiles.size(), + FlinkManifestFileUtil.getDataFileCount(flinkManifestFiles), + FlinkManifestFileUtil.getRecordCount(flinkManifestFiles), + FlinkManifestFileUtil.getByteCount(flinkManifestFiles) + ); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + LOG.info("Iceberg committer {}.{} checkpoint {} completed", namespace, tableName, checkpointId); + synchronized (this) { + if (checkpointId == metadata.getLastCheckpointId()) { + try { + commit(); + postCommitSuccess(); + } catch (Exception t) { + // swallow the exception to avoid job restart in case of commit failure + //LOG.error(String.format("Iceberg committer %s.%s failed to do post checkpoint commit", database, tableName), + // t); + LOG.error("Iceberg committer {}.{} failed to do post checkpoint commit. Throwable = ", + namespace, tableName, t); + } + } else { + // TODO: it would be nice to fix this and allow concurrent checkpoint + LOG.info("Iceberg committer {}.{} skip committing transaction: " + Review comment: Actually this is not only about concurrent checkpoint. Let's say if users set a short checkpoint interval, and `IcebergCommitter` may invoke `snapshotState` twice before receiving the notification of the first checkpoint's completion from `JobManager`. In this scenario, I guess the data is discarded because `pendingDataFiles` is cleared in `snapshotState` process if I understand correctly. ########## File path: flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergWriter.java ########## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.connector.IcebergConnectorConstant; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + +@SuppressWarnings("checkstyle:ClassTypeParameterName") +public class IcebergWriter<IN> extends AbstractStreamOperator<FlinkDataFile> + implements OneInputStreamOperator<IN, FlinkDataFile> { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); + + private final RecordSerializer<IN> serializer; + private final String namespace; + private final String tableName; + private final FileFormat format; + private final boolean skipIncompatibleRecord; + private final Schema schema; + private final PartitionSpec spec; + private final LocationProvider locations; + private final FileIO io; + private final Map<String, String> properties; + private final String timestampFeild; + private final TimeUnit timestampUnit; + private final long maxFileSize; + + private transient org.apache.hadoop.conf.Configuration hadoopConf; + private transient Map<String, FileWriter> openPartitionFiles; + private transient int subtaskId; + private transient ProcessingTimeService timerService; + private transient Partitioner<Record> partitioner; + + public IcebergWriter(Table table, @Nullable RecordSerializer<IN> serializer, Configuration config) { + this.serializer = serializer; + skipIncompatibleRecord = config.getBoolean(IcebergConnectorConstant.SKIP_INCOMPATIBLE_RECORD, + IcebergConnectorConstant.DEFAULT_SKIP_INCOMPATIBLE_RECORD); + timestampFeild = config.getString(IcebergConnectorConstant.WATERMARK_TIMESTAMP_FIELD, ""); + timestampUnit = TimeUnit.valueOf(config.getString(IcebergConnectorConstant.WATERMARK_TIMESTAMP_UNIT, + IcebergConnectorConstant.DEFAULT_WATERMARK_TIMESTAMP_UNIT)); + maxFileSize = config.getLong(IcebergConnectorConstant.MAX_FILE_SIZE, + IcebergConnectorConstant.DEFAULT_MAX_FILE_SIZE); + + namespace = config.getString(IcebergConnectorConstant.NAMESPACE, ""); + tableName = config.getString(IcebergConnectorConstant.TABLE, ""); + + schema = table.schema(); + spec = table.spec(); + locations = table.locationProvider(); + io = table.io(); + properties = table.properties(); + format = FileFormat.valueOf( + properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH)); + + LOG.info("Iceberg writer {}.{} data file location: {}", + namespace, tableName, locations.newDataLocation("")); + LOG.info("Iceberg writer {}.{} created with sink config", namespace, tableName); + LOG.info("Iceberg writer {}.{} loaded table: schema = {}\npartition spec = {}", + namespace, tableName, schema, spec); + + // default ChainingStrategy is set to HEAD + // we prefer chaining to avoid the huge serialization and deserializatoin overhead. + super.setChainingStrategy(ChainingStrategy.ALWAYS); + } + + @Override + public void open() throws Exception { + super.open(); + + hadoopConf = new org.apache.hadoop.conf.Configuration(); + subtaskId = getRuntimeContext().getIndexOfThisSubtask(); + timerService = getProcessingTimeService(); + openPartitionFiles = new HashMap<>(); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + LOG.info("Iceberg writer {}.{} subtask {} begin preparing for checkpoint {}", + namespace, tableName, subtaskId, checkpointId); + // close all open files and emit files to downstream committer operator + flush(true); + LOG.info("Iceberg writer {}.{} subtask {} completed preparing for checkpoint {}", + namespace, tableName, subtaskId, checkpointId); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + } + + @VisibleForTesting + List<FlinkDataFile> flush(boolean emit) throws IOException { + List<FlinkDataFile> dataFiles = new ArrayList<>(openPartitionFiles.size()); + for (Map.Entry<String, FileWriter> entry : openPartitionFiles.entrySet()) { + FileWriter writer = entry.getValue(); + FlinkDataFile flinkDataFile = closeWriter(writer); + dataFiles.add(flinkDataFile); + if (emit) { + emit(flinkDataFile); + } + } + LOG.info("Iceberg writer {}.{} subtask {} flushed {} open files", + namespace, tableName, subtaskId, openPartitionFiles.size()); + openPartitionFiles.clear(); + return dataFiles; + } + + FlinkDataFile closeWriter(FileWriter writer) throws IOException { + FlinkDataFile flinkDataFile = writer.close(); + LOG.info( + "Iceberg writer {}.{} subtask {} uploaded to Iceberg table {}.{} with {} records and {} bytes on this path: {}", + namespace, tableName, subtaskId, namespace, tableName, flinkDataFile.getIcebergDataFile().recordCount(), + flinkDataFile.getIcebergDataFile().fileSizeInBytes(), flinkDataFile.getIcebergDataFile().path()); + return flinkDataFile; + } + + void emit(FlinkDataFile flinkDataFile) { + output.collect(new StreamRecord<>(flinkDataFile)); + LOG.debug("Iceberg writer {}.{} subtask {} emitted uploaded file to committer for Iceberg table {}.{}" + + " with {} records and {} bytes on this path: {}", + namespace, tableName, subtaskId, namespace, tableName, flinkDataFile.getIcebergDataFile().recordCount(), + flinkDataFile.getIcebergDataFile().fileSizeInBytes(), flinkDataFile.getIcebergDataFile().path()); + } + + @Override + public void close() throws Exception { + super.close(); + + LOG.info("Iceberg writer {}.{} subtask {} begin close", namespace, tableName, subtaskId); + // close all open files without emitting to downstream committer + flush(false); + LOG.info("Iceberg writer {}.{} subtask {} completed close", namespace, tableName, subtaskId); + } + + @Override + public void dispose() throws Exception { + super.dispose(); + + LOG.info("Iceberg writer {}.{} subtask {} begin dispose", namespace, tableName, subtaskId); + abort(); + LOG.info("Iceberg writer {}.{} subtask {} completed dispose", namespace, tableName, subtaskId); + } + + private void abort() { + LOG.info("Iceberg writer {}.{} subtask {} has {} open files to abort", + namespace, tableName, subtaskId, openPartitionFiles.size()); + // close all open files without sending DataFile list to downstream committer operator. + // because there are not checkpointed, + // we don't want to commit these files. + for (Map.Entry<String, FileWriter> entry : openPartitionFiles.entrySet()) { + final FileWriter writer = entry.getValue(); + final Path path = writer.getPath(); + try { + LOG.debug("Iceberg writer {}.{} subtask {} start to abort file: {}", + namespace, tableName, subtaskId, path); + writer.abort(); + LOG.info("Iceberg writer {}.{} subtask {} completed aborting file: {}", + namespace, tableName, subtaskId, path); + } catch (Throwable t) { +// LOG.error(String.format("Iceberg writer %s.%s subtask %d failed to abort open file: %s", +// namespace, tableName, subtaskId, path.toString()), t); + LOG.error("Iceberg writer {}.{} subtask {} failed to abort open file: {}. Throwable = {}", + namespace, tableName, subtaskId, path.toString(), t); + continue; + } + + try { + LOG.debug("Iceberg writer {}.{} subtask {} deleting aborted file: {}", + namespace, tableName, subtaskId, path); + io.deleteFile(path.toString()); + LOG.info("Iceberg writer {}.{} subtask {} deleted aborted file: {}", + namespace, tableName, subtaskId, path); + } catch (Throwable t) { +// LOG.error(String.format( +// "Iceberg writer %s.%s subtask %d failed to delete aborted file: %s", +// namespace, tableName, subtaskId, path.toString()), t); + LOG.error("Iceberg writer {}.{} subtask {} failed to delete aborted file: {}. Throwable = {}", + namespace, tableName, subtaskId, path.toString(), t); + } + } + LOG.info("Iceberg writer {}.{} subtask {} aborted {} open files", + namespace, tableName, subtaskId, openPartitionFiles.size()); + openPartitionFiles.clear(); + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + IN value = element.getValue(); + try { + processInternal(value); + } catch (Exception t) { + if (!skipIncompatibleRecord) { + throw t; + } + } + } + + @VisibleForTesting + void processInternal(IN value) throws Exception { + Record record = serializer.serialize(value, schema); + processRecord(record); + } + + /** + * process element as {@link Record} + */ + private void processRecord(Record record) throws Exception { + if (partitioner == null) { + partitioner = new RecordPartitioner(spec); + } + partitioner.partition(record); + final String partitionPath = locations.newDataLocation(spec, partitioner, ""); + if (!openPartitionFiles.containsKey(partitionPath)) { + final Path path = new Path(partitionPath, generateFileName()); Review comment: I wonder whether we have a max number of `openPartitionFiles` here, because this may introduce performance regression when too many files are procesed. (random reads/writes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org