This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 56ae374 Flink: Add an optional uidPrefix to FlinkSink#Builder to
explicitly set operator uid. (#2745)
56ae374 is described below
commit 56ae3748945add6044671ddd30cde6a815c6ddb4
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Tue Jun 29 00:35:59 2021 -0700
Flink: Add an optional uidPrefix to FlinkSink#Builder to explicitly set
operator uid. (#2745)
---
.../org/apache/iceberg/flink/sink/FlinkSink.java | 45 ++++++++++++++++++++--
1 file changed, 42 insertions(+), 3 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index f679e23..ed65528 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
@@ -124,6 +125,7 @@ public class FlinkSink {
private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
private List<String> equalityFieldColumns = null;
+ private String uidPrefix = null;
private Builder() {
}
@@ -205,6 +207,30 @@ public class FlinkSink {
return this;
}
+ /**
+ * Set the uid prefix for FlinkSink operators. Note that FlinkSink
internally consists of multiple operators (like
+ * writer, committer, dummy sink etc.) Actually operator uid will be
appended with a suffix like "uid-writer".
+ * <p>
+ * Flink auto generates operator uids if not set explicitly. It is a
recommended
+ * <a
href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/">
+ * best-practice to set uid for all operators</a> before deploying to
production. Flink has an option to {@code
+ * pipeline.auto-generate-uids=false} to disable auto-generation and force
explicit setting of all operator uids.
+ * <p>
+ * Be careful with setting this for an existing job, because now we are
changing the opeartor uid from an
+ * auto-generated one to this new value. When deploying the change with a
checkpoint, Flink won't be able to restore
+ * the previous Flink sink operator state (more specifically the committer
operator state). You need to use {@code
+ * --allowNonRestoredState} to ignore the previous sink state. During
restore Flink sink state is used to check if
+ * checkpointed files were actually committed or not. {@code
--allowNonRestoredState} can lead to data loss if the
+ * Iceberg commit failed in the last completed checkpoints.
+ *
+ * @param newPrefix UID prefix for Flink sink operators
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder uidPrefix(String newPrefix) {
+ this.uidPrefix = newPrefix;
+ return this;
+ }
+
@SuppressWarnings("unchecked")
public DataStreamSink<RowData> build() {
Preconditions.checkArgument(rowDataInput != null,
@@ -243,16 +269,29 @@ public class FlinkSink {
this.writeParallelism = writeParallelism == null ?
rowDataInput.getParallelism() : writeParallelism;
- DataStream<Void> returnStream = rowDataInput
+ SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
.transform(ICEBERG_STREAM_WRITER_NAME,
TypeInformation.of(WriteResult.class), streamWriter)
- .setParallelism(writeParallelism)
+ .setParallelism(writeParallelism);
+ if (uidPrefix != null) {
+ writerStream = writerStream.uid(uidPrefix + "-writer");
+ }
+
+ SingleOutputStreamOperator<Void> committerStream = writerStream
.transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
.setParallelism(1)
.setMaxParallelism(1);
+ if (uidPrefix != null) {
+ committerStream = committerStream.uid(uidPrefix + "-committer");
+ }
- return returnStream.addSink(new DiscardingSink())
+ DataStreamSink<RowData> resultStream = committerStream
+ .addSink(new DiscardingSink())
.name(String.format("IcebergSink %s", table.name()))
.setParallelism(1);
+ if (uidPrefix != null) {
+ resultStream = resultStream.uid(uidPrefix + "-dummysink");
+ }
+ return resultStream;
}
private DataStream<RowData> distributeDataStream(DataStream<RowData> input,