Reo-LEI opened a new pull request #4274: URL: https://github.com/apache/iceberg/pull/4274
Port PR #2898 to flink 1.13 for this [comment](https://github.com/apache/iceberg/pull/2898#pullrequestreview-894608167). The follow is the diff of `git diff --no-index flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink`, and I confirmed that there are no modification relevant to PR 2898. ``` diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index b00018b3b..d20859377 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -61,10 +61,11 @@ class FlinkManifestUtil { } } - static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId, - long attemptNumber) { + static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId, + int subTaskId, long attemptNumber) { TableOperations ops = ((HasTableOperations) table).operations(); - return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber); + return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId, + subTaskId, attemptNumber); } static DeltaManifests writeCompletedFiles(WriteResult result, diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 5e8085d67..d15a5f208 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -54,6 +54,7 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; @@ -134,6 +135,7 @@ public class FlinkSink { private boolean upsert = false; private List<String> equalityFieldColumns = null; private String uidPrefix = null; + private final Map<String, String> snapshotProperties = Maps.newHashMap(); private Builder() { } @@ -271,6 +273,16 @@ public class FlinkSink { return this; } + public Builder setSnapshotProperties(Map<String, String> properties) { + snapshotProperties.putAll(properties); + return this; + } + + public Builder setSnapshotProperty(String property, String value) { + snapshotProperties.put(property, value); + return this; + } + private <T> DataStreamSink<T> chainIcebergOperators() { Preconditions.checkArgument(inputCreator != null, "Please use forRowData() or forMapperOutputType() to initialize the input DataStream."); @@ -369,7 +381,7 @@ public class FlinkSink { } private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) { - IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite); + IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite, snapshotProperties); SingleOutputStreamOperator<Void> committerStream = writerStream .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter) .setParallelism(1) diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 010df8cf5..1dfebb12a 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -78,6 +78,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void> // TableLoader to load iceberg table lazily. private final TableLoader tableLoader; private final boolean replacePartitions; + private final Map<String, String> snapshotProperties; // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for @@ -109,9 +110,10 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void> private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor(); private transient ListState<SortedMap<Long, byte[]>> checkpointsState; - IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) { + IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions, Map<String, String> snapshotProperties) { this.tableLoader = tableLoader; this.replacePartitions = replacePartitions; + this.snapshotProperties = snapshotProperties; } @Override @@ -129,7 +131,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void> int subTaskId = getRuntimeContext().getIndexOfThisSubtask(); int attemptId = getRuntimeContext().getAttemptNumber(); - this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId); + String operatorUniqueId = getRuntimeContext().getOperatorUniqueID(); + this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId, + subTaskId, attemptId); this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); @@ -305,6 +309,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void> String newFlinkJobId, long checkpointId) { LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles, numDeleteFiles, table); + snapshotProperties.forEach(operation::set); + // custom snapshot metadata properties will be overridden if they conflict with internal ones used by the sink. operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); operation.set(FLINK_JOB_ID, newFlinkJobId); @@ -345,7 +351,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void> } @Override - public void dispose() throws Exception { + public void close() throws Exception { if (tableLoader != null) { tableLoader.close(); } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java index 6d12310dd..cc8e6ce82 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java @@ -73,8 +73,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> } @Override - public void dispose() throws Exception { - super.dispose(); + public void close() throws Exception { + super.close(); if (writer != null) { writer.close(); writer = null; diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java index fca86080b..b7d575bb4 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java @@ -35,23 +35,25 @@ class ManifestOutputFileFactory { private final FileIO io; private final Map<String, String> props; private final String flinkJobId; + private final String operatorUniqueId; private final int subTaskId; private final long attemptNumber; private final AtomicInteger fileCount = new AtomicInteger(0); ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props, - String flinkJobId, int subTaskId, long attemptNumber) { + String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) { this.ops = ops; this.io = io; this.props = props; this.flinkJobId = flinkJobId; + this.operatorUniqueId = operatorUniqueId; this.subTaskId = subTaskId; this.attemptNumber = attemptNumber; } private String generatePath(long checkpointId) { - return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId, - attemptNumber, checkpointId, fileCount.incrementAndGet())); + return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId, + subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet())); } OutputFile create(long checkpointId) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
