This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 56ae374  Flink: Add an optional uidPrefix to FlinkSink#Builder to 
explicitly set operator uid. (#2745)
56ae374 is described below

commit 56ae3748945add6044671ddd30cde6a815c6ddb4
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Tue Jun 29 00:35:59 2021 -0700

    Flink: Add an optional uidPrefix to FlinkSink#Builder to explicitly set 
operator uid. (#2745)
---
 .../org/apache/iceberg/flink/sink/FlinkSink.java   | 45 ++++++++++++++++++++--
 1 file changed, 42 insertions(+), 3 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index f679e23..ed65528 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
@@ -124,6 +125,7 @@ public class FlinkSink {
     private DistributionMode distributionMode = null;
     private Integer writeParallelism = null;
     private List<String> equalityFieldColumns = null;
+    private String uidPrefix = null;
 
     private Builder() {
     }
@@ -205,6 +207,30 @@ public class FlinkSink {
       return this;
     }
 
+    /**
+     * Set the uid prefix for FlinkSink operators. Note that FlinkSink 
internally consists of multiple operators (like
+     * writer, committer, dummy sink etc.) Actually operator uid will be 
appended with a suffix like "uid-writer".
+     * <p>
+     * Flink auto generates operator uids if not set explicitly. It is a 
recommended
+     * <a 
href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/";>
+     * best-practice to set uid for all operators</a> before deploying to 
production. Flink has an option to {@code
+     * pipeline.auto-generate-uids=false} to disable auto-generation and force 
explicit setting of all operator uids.
+     * <p>
+     * Be careful with setting this for an existing job, because now we are 
changing the opeartor uid from an
+     * auto-generated one to this new value. When deploying the change with a 
checkpoint, Flink won't be able to restore
+     * the previous Flink sink operator state (more specifically the committer 
operator state). You need to use {@code
+     * --allowNonRestoredState} to ignore the previous sink state. During 
restore Flink sink state is used to check if
+     * checkpointed files were actually committed or not. {@code 
--allowNonRestoredState} can lead to data loss if the
+     * Iceberg commit failed in the last completed checkpoints.
+     *
+     * @param newPrefix UID prefix for Flink sink operators
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder uidPrefix(String newPrefix) {
+      this.uidPrefix = newPrefix;
+      return this;
+    }
+
     @SuppressWarnings("unchecked")
     public DataStreamSink<RowData> build() {
       Preconditions.checkArgument(rowDataInput != null,
@@ -243,16 +269,29 @@ public class FlinkSink {
 
       this.writeParallelism = writeParallelism == null ? 
rowDataInput.getParallelism() : writeParallelism;
 
-      DataStream<Void> returnStream = rowDataInput
+      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
           .transform(ICEBERG_STREAM_WRITER_NAME, 
TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism)
+          .setParallelism(writeParallelism);
+      if (uidPrefix != null) {
+        writerStream = writerStream.uid(uidPrefix + "-writer");
+      }
+
+      SingleOutputStreamOperator<Void> committerStream = writerStream
           .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
           .setParallelism(1)
           .setMaxParallelism(1);
+      if (uidPrefix != null) {
+        committerStream = committerStream.uid(uidPrefix + "-committer");
+      }
 
-      return returnStream.addSink(new DiscardingSink())
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
+      if (uidPrefix != null) {
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
+      }
+      return resultStream;
     }
 
     private DataStream<RowData> distributeDataStream(DataStream<RowData> input,

Reply via email to