This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new dcca8f26 [hotfix] CommitterOperator#endInput should also commit 
snapshots for batch jobs even if checkpoint interval is set
dcca8f26 is described below

commit dcca8f265142b4f9b6f6b14a01d51132c8ebaea3
Author: tsreaper <[email protected]>
AuthorDate: Thu Jul 7 17:35:10 2022 +0800

    [hotfix] CommitterOperator#endInput should also commit snapshots for batch 
jobs even if checkpoint interval is set
    
    This closes #204
---
 .../table/store/connector/sink/CommitterOperator.java      |  8 ++++----
 .../flink/table/store/connector/sink/FlinkSinkBuilder.java |  1 -
 .../apache/flink/table/store/connector/sink/StoreSink.java | 14 +++++++++-----
 3 files changed, 13 insertions(+), 10 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index 0f16adc2..2ae924aa 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -59,7 +59,7 @@ public class CommitterOperator extends 
AbstractStreamOperator<Committable>
      * checkpoint is not enabled we need to commit remaining data in {@link
      * CommitterOperator#endInput}.
      */
-    private final boolean checkpointEnabled;
+    private final boolean streamingCheckpointEnabled;
 
     /** Group the committable by the checkpoint id. */
     private final NavigableMap<Long, ManifestCommittable> 
committablesPerCheckpoint;
@@ -80,11 +80,11 @@ public class CommitterOperator extends 
AbstractStreamOperator<Committable>
     private Committer committer;
 
     public CommitterOperator(
-            boolean checkpointEnabled,
+            boolean streamingCheckpointEnabled,
             SerializableFunction<String, Committer> committerFactory,
             
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
                     committableSerializer) {
-        this.checkpointEnabled = checkpointEnabled;
+        this.streamingCheckpointEnabled = streamingCheckpointEnabled;
         this.committableSerializer = committableSerializer;
         this.committablesPerCheckpoint = new TreeMap<>();
         this.committerFactory = checkNotNull(committerFactory);
@@ -165,7 +165,7 @@ public class CommitterOperator extends 
AbstractStreamOperator<Committable>
 
     @Override
     public void endInput() throws Exception {
-        if (checkpointEnabled) {
+        if (streamingCheckpointEnabled) {
             return;
         }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index bac731dc..6195a238 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -106,7 +106,6 @@ public class FlinkSinkBuilder {
                 new StoreSink(
                         tableIdentifier,
                         table,
-                        env.getCheckpointConfig().isCheckpointingEnabled(),
                         
conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
                         getCompactPartSpec(),
                         lockFactory,
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 24542bd6..544675a6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -50,8 +53,6 @@ public class StoreSink implements Serializable {
 
     private final FileStoreTable table;
 
-    private final boolean checkpointEnabled;
-
     private final boolean compactionTask;
 
     @Nullable private final Map<String, String> compactPartitionSpec;
@@ -65,7 +66,6 @@ public class StoreSink implements Serializable {
     public StoreSink(
             ObjectIdentifier tableIdentifier,
             FileStoreTable table,
-            boolean checkpointEnabled,
             boolean compactionTask,
             @Nullable Map<String, String> compactPartitionSpec,
             @Nullable CatalogLock.Factory lockFactory,
@@ -73,7 +73,6 @@ public class StoreSink implements Serializable {
             @Nullable LogSinkFunction logSinkFunction) {
         this.tableIdentifier = tableIdentifier;
         this.table = table;
-        this.checkpointEnabled = checkpointEnabled;
         this.compactionTask = compactionTask;
         this.compactPartitionSpec = compactPartitionSpec;
         this.lockFactory = lockFactory;
@@ -119,12 +118,17 @@ public class StoreSink implements Serializable {
                 input.transform(WRITER_NAME, typeInfo, createWriteOperator())
                         .setParallelism(input.getParallelism());
 
+        StreamExecutionEnvironment env = input.getExecutionEnvironment();
+        boolean streamingCheckpointEnabled =
+                env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                                == RuntimeExecutionMode.STREAMING
+                        && env.getCheckpointConfig().isCheckpointingEnabled();
         SingleOutputStreamOperator<?> committed =
                 written.transform(
                                 GLOBAL_COMMITTER_NAME,
                                 typeInfo,
                                 new CommitterOperator(
-                                        checkpointEnabled,
+                                        streamingCheckpointEnabled,
                                         this::createCommitter,
                                         ManifestCommittableSerializer::new))
                         .setParallelism(1)

Reply via email to