This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new dcca8f26 [hotfix] CommitterOperator#endInput should also commit
snapshots for batch jobs even if checkpoint interval is set
dcca8f26 is described below
commit dcca8f265142b4f9b6f6b14a01d51132c8ebaea3
Author: tsreaper <[email protected]>
AuthorDate: Thu Jul 7 17:35:10 2022 +0800
[hotfix] CommitterOperator#endInput should also commit snapshots for batch
jobs even if checkpoint interval is set
This closes #204
---
.../table/store/connector/sink/CommitterOperator.java | 8 ++++----
.../flink/table/store/connector/sink/FlinkSinkBuilder.java | 1 -
.../apache/flink/table/store/connector/sink/StoreSink.java | 14 +++++++++-----
3 files changed, 13 insertions(+), 10 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index 0f16adc2..2ae924aa 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -59,7 +59,7 @@ public class CommitterOperator extends
AbstractStreamOperator<Committable>
* checkpoint is not enabled we need to commit remaining data in {@link
* CommitterOperator#endInput}.
*/
- private final boolean checkpointEnabled;
+ private final boolean streamingCheckpointEnabled;
/** Group the committable by the checkpoint id. */
private final NavigableMap<Long, ManifestCommittable>
committablesPerCheckpoint;
@@ -80,11 +80,11 @@ public class CommitterOperator extends
AbstractStreamOperator<Committable>
private Committer committer;
public CommitterOperator(
- boolean checkpointEnabled,
+ boolean streamingCheckpointEnabled,
SerializableFunction<String, Committer> committerFactory,
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
committableSerializer) {
- this.checkpointEnabled = checkpointEnabled;
+ this.streamingCheckpointEnabled = streamingCheckpointEnabled;
this.committableSerializer = committableSerializer;
this.committablesPerCheckpoint = new TreeMap<>();
this.committerFactory = checkNotNull(committerFactory);
@@ -165,7 +165,7 @@ public class CommitterOperator extends
AbstractStreamOperator<Committable>
@Override
public void endInput() throws Exception {
- if (checkpointEnabled) {
+ if (streamingCheckpointEnabled) {
return;
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index bac731dc..6195a238 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -106,7 +106,6 @@ public class FlinkSinkBuilder {
new StoreSink(
tableIdentifier,
table,
- env.getCheckpointConfig().isCheckpointingEnabled(),
conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
getCompactPartSpec(),
lockFactory,
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 24542bd6..544675a6 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -18,9 +18,12 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -50,8 +53,6 @@ public class StoreSink implements Serializable {
private final FileStoreTable table;
- private final boolean checkpointEnabled;
-
private final boolean compactionTask;
@Nullable private final Map<String, String> compactPartitionSpec;
@@ -65,7 +66,6 @@ public class StoreSink implements Serializable {
public StoreSink(
ObjectIdentifier tableIdentifier,
FileStoreTable table,
- boolean checkpointEnabled,
boolean compactionTask,
@Nullable Map<String, String> compactPartitionSpec,
@Nullable CatalogLock.Factory lockFactory,
@@ -73,7 +73,6 @@ public class StoreSink implements Serializable {
@Nullable LogSinkFunction logSinkFunction) {
this.tableIdentifier = tableIdentifier;
this.table = table;
- this.checkpointEnabled = checkpointEnabled;
this.compactionTask = compactionTask;
this.compactPartitionSpec = compactPartitionSpec;
this.lockFactory = lockFactory;
@@ -119,12 +118,17 @@ public class StoreSink implements Serializable {
input.transform(WRITER_NAME, typeInfo, createWriteOperator())
.setParallelism(input.getParallelism());
+ StreamExecutionEnvironment env = input.getExecutionEnvironment();
+ boolean streamingCheckpointEnabled =
+ env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING
+ && env.getCheckpointConfig().isCheckpointingEnabled();
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator(
- checkpointEnabled,
+ streamingCheckpointEnabled,
this::createCommitter,
ManifestCommittableSerializer::new))
.setParallelism(1)