Reo-LEI opened a new pull request #4274:
URL: https://github.com/apache/iceberg/pull/4274


   Port PR #2898 to flink 1.13 for this 
[comment](https://github.com/apache/iceberg/pull/2898#pullrequestreview-894608167).
   
   The follow is the diff of `git diff --no-index 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink`, and I confirmed 
that there are no modification relevant to PR 2898.
   
   ```
   diff --git 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
   index b00018b3b..d20859377 100644
   --- 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
   +++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
   @@ -61,10 +61,11 @@ class FlinkManifestUtil {
        }
      }
    
   -  static ManifestOutputFileFactory createOutputFileFactory(Table table, 
String flinkJobId, int subTaskId,
   -                                                           long 
attemptNumber) {
   +  static ManifestOutputFileFactory createOutputFileFactory(Table table, 
String flinkJobId, String operatorUniqueId,
   +                                                           int subTaskId, 
long attemptNumber) {
        TableOperations ops = ((HasTableOperations) table).operations();
   -    return new ManifestOutputFileFactory(ops, table.io(), 
table.properties(), flinkJobId, subTaskId, attemptNumber);
   +    return new ManifestOutputFileFactory(ops, table.io(), 
table.properties(), flinkJobId, operatorUniqueId,
   +        subTaskId, attemptNumber);
      }
    
      static DeltaManifests writeCompletedFiles(WriteResult result,
   diff --git 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   index 5e8085d67..d15a5f208 100644
   --- 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   +++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   @@ -54,6 +54,7 @@ import org.apache.iceberg.io.WriteResult;
    import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
    import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
    import org.apache.iceberg.relocated.com.google.common.collect.Lists;
   +import org.apache.iceberg.relocated.com.google.common.collect.Maps;
    import org.apache.iceberg.relocated.com.google.common.collect.Sets;
    import org.apache.iceberg.types.TypeUtil;
    import org.apache.iceberg.util.PropertyUtil;
   @@ -134,6 +135,7 @@ public class FlinkSink {
        private boolean upsert = false;
        private List<String> equalityFieldColumns = null;
        private String uidPrefix = null;
   +    private final Map<String, String> snapshotProperties = 
Maps.newHashMap();
    
        private Builder() {
        }
   @@ -271,6 +273,16 @@ public class FlinkSink {
          return this;
        }
    
   +    public Builder setSnapshotProperties(Map<String, String> properties) {
   +      snapshotProperties.putAll(properties);
   +      return this;
   +    }
   +
   +    public Builder setSnapshotProperty(String property, String value) {
   +      snapshotProperties.put(property, value);
   +      return this;
   +    }
   +
        private <T> DataStreamSink<T> chainIcebergOperators() {
          Preconditions.checkArgument(inputCreator != null,
              "Please use forRowData() or forMapperOutputType() to initialize 
the input DataStream.");
   @@ -369,7 +381,7 @@ public class FlinkSink {
        }
    
        private SingleOutputStreamOperator<Void> 
appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
   -      IcebergFilesCommitter filesCommitter = new 
IcebergFilesCommitter(tableLoader, overwrite);
   +      IcebergFilesCommitter filesCommitter = new 
IcebergFilesCommitter(tableLoader, overwrite, snapshotProperties);
          SingleOutputStreamOperator<Void> committerStream = writerStream
              .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), 
Types.VOID, filesCommitter)
              .setParallelism(1)
   diff --git 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   index 010df8cf5..1dfebb12a 100644
   --- 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   +++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   @@ -78,6 +78,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
      // TableLoader to load iceberg table lazily.
      private final TableLoader tableLoader;
      private final boolean replacePartitions;
   +  private final Map<String, String> snapshotProperties;
    
      // A sorted map to maintain the completed data files for each pending 
checkpointId (which have not been committed
      // to iceberg table). We need a sorted map here because there's possible 
that few checkpoints snapshot failed, for
   @@ -109,9 +110,10 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
      private static final ListStateDescriptor<SortedMap<Long, byte[]>> 
STATE_DESCRIPTOR = buildStateDescriptor();
      private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
    
   -  IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) 
{
   +  IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions, 
Map<String, String> snapshotProperties) {
        this.tableLoader = tableLoader;
        this.replacePartitions = replacePartitions;
   +    this.snapshotProperties = snapshotProperties;
      }
    
      @Override
   @@ -129,7 +131,9 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
    
        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
        int attemptId = getRuntimeContext().getAttemptNumber();
   -    this.manifestOutputFileFactory = 
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, 
attemptId);
   +    String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
   +    this.manifestOutputFileFactory = 
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
   +        subTaskId, attemptId);
        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
    
        this.checkpointsState = 
context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
   @@ -305,6 +309,8 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
                                   String newFlinkJobId, long checkpointId) {
        LOG.info("Committing {} with {} data files and {} delete files to table 
{}", description, numDataFiles,
            numDeleteFiles, table);
   +    snapshotProperties.forEach(operation::set);
   +    // custom snapshot metadata properties will be overridden if they 
conflict with internal ones used by the sink.
        operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
        operation.set(FLINK_JOB_ID, newFlinkJobId);
    
   @@ -345,7 +351,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
      }
    
      @Override
   -  public void dispose() throws Exception {
   +  public void close() throws Exception {
        if (tableLoader != null) {
          tableLoader.close();
        }
   diff --git 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
   index 6d12310dd..cc8e6ce82 100644
   --- 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
   +++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
   @@ -73,8 +73,8 @@ class IcebergStreamWriter<T> extends 
AbstractStreamOperator<WriteResult>
      }
    
      @Override
   -  public void dispose() throws Exception {
   -    super.dispose();
   +  public void close() throws Exception {
   +    super.close();
        if (writer != null) {
          writer.close();
          writer = null;
   diff --git 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
   index fca86080b..b7d575bb4 100644
   --- 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
   +++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
   @@ -35,23 +35,25 @@ class ManifestOutputFileFactory {
      private final FileIO io;
      private final Map<String, String> props;
      private final String flinkJobId;
   +  private final String operatorUniqueId;
      private final int subTaskId;
      private final long attemptNumber;
      private final AtomicInteger fileCount = new AtomicInteger(0);
    
      ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, 
String> props,
   -                            String flinkJobId, int subTaskId, long 
attemptNumber) {
   +                            String flinkJobId,  String operatorUniqueId, 
int subTaskId, long attemptNumber) {
        this.ops = ops;
        this.io = io;
        this.props = props;
        this.flinkJobId = flinkJobId;
   +    this.operatorUniqueId = operatorUniqueId;
        this.subTaskId = subTaskId;
        this.attemptNumber = attemptNumber;
      }
    
      private String generatePath(long checkpointId) {
   -    return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", 
flinkJobId, subTaskId,
   -        attemptNumber, checkpointId, fileCount.incrementAndGet()));
   +    return 
FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, 
operatorUniqueId,
   +        subTaskId, attemptNumber, checkpointId, 
fileCount.incrementAndGet()));
      }
    
      OutputFile create(long checkpointId) {
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to