This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7a8322506 [FLINK-39056][pipeline-connector][iceberg] Fix Duplicate
Data Issue in Iceberg Sink During Two-Phase Commit (#4269)
7a8322506 is described below
commit 7a8322506318cdeb12c0384b1301e8dde24d0bee
Author: fcfangcc <[email protected]>
AuthorDate: Wed Feb 11 16:17:26 2026 +0800
[FLINK-39056][pipeline-connector][iceberg] Fix Duplicate Data Issue in
Iceberg Sink During Two-Phase Commit (#4269)
Co-authored-by: Copilot <[email protected]>
Co-authored-by: lvyanquan <[email protected]>
Co-authored-by: Kunni <[email protected]>
---
.../connectors/iceberg/sink/IcebergDataSink.java | 9 +-
.../iceberg/sink/IcebergDataSinkFactory.java | 6 +-
.../iceberg/sink/IcebergDataSinkOptions.java | 8 ++
.../iceberg/sink/v2/IcebergCommitter.java | 71 ++++++++++++++-
.../connectors/iceberg/sink/v2/IcebergSink.java | 61 ++++++++++++-
.../connectors/iceberg/sink/v2/IcebergWriter.java | 46 +++++++++-
.../iceberg/sink/v2/IcebergWriterState.java | 72 +++++++++++++++
.../sink/v2/IcebergWriterStateSerializer.java | 57 ++++++++++++
.../iceberg/sink/v2/WriteResultWrapper.java | 34 ++++++-
.../iceberg/sink/IcebergDataSinkFactoryTest.java | 2 +
.../iceberg/sink/v2/CompactionOperatorTest.java | 17 +++-
.../iceberg/sink/v2/IcebergSinkITCase.java | 7 +-
.../sink/v2/IcebergWriterStateSerializerTest.java | 37 ++++++++
.../iceberg/sink/v2/IcebergWriterTest.java | 101 ++++++++++++++++++++-
14 files changed, 506 insertions(+), 22 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
index 0581858da..96c2b5f76 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
@@ -47,25 +47,30 @@ public class IcebergDataSink implements DataSink,
Serializable {
public final CompactionOptions compactionOptions;
+ public final String jobIdPrefix;
+
public IcebergDataSink(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
Map<TableId, List<String>> partitionMaps,
ZoneId zoneId,
String schemaOperatorUid,
- CompactionOptions compactionOptions) {
+ CompactionOptions compactionOptions,
+ String jobIdPrefix) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
this.zoneId = zoneId;
this.schemaOperatorUid = schemaOperatorUid;
this.compactionOptions = compactionOptions;
+ this.jobIdPrefix = jobIdPrefix;
}
@Override
public EventSinkProvider getEventSinkProvider() {
IcebergSink icebergEventSink =
- new IcebergSink(catalogOptions, tableOptions, zoneId,
compactionOptions);
+ new IcebergSink(
+ catalogOptions, tableOptions, zoneId,
compactionOptions, jobIdPrefix);
return FlinkSinkProvider.of(icebergEventSink);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 80f1df659..c08565294 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -106,6 +106,8 @@ public class IcebergDataSinkFactory implements
DataSinkFactory {
}
}
}
+ String jobIdPrefix =
+
context.getFactoryConfiguration().get(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX);
return new IcebergDataSink(
catalogOptions,
@@ -113,7 +115,8 @@ public class IcebergDataSinkFactory implements
DataSinkFactory {
partitionMaps,
zoneId,
schemaOperatorUid,
- compactionOptions);
+ compactionOptions,
+ jobIdPrefix);
}
private CompactionOptions getCompactionStrategy(Configuration
configuration) {
@@ -144,6 +147,7 @@ public class IcebergDataSinkFactory implements
DataSinkFactory {
options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL);
options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM);
+ options.add(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX);
return options;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index 517e1c8eb..f989909fa 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -78,4 +78,12 @@ public class IcebergDataSinkOptions {
.defaultValue(-1)
.withDescription(
"The parallelism for file compaction, default
value is -1, which means that compaction parallelism is equal to sink writer
parallelism.");
+
+ @Experimental
+ public static final ConfigOption<String> SINK_JOB_ID_PREFIX =
+ key("sink.job.id.prefix")
+ .stringType()
+ .defaultValue("cdc")
+ .withDescription(
+ "The prefix of job id, which is used to
distinguish different jobs.");
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
index 9d61a8e43..6cadc3d91 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
@@ -28,10 +28,14 @@ import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +48,7 @@ import java.util.Map;
import java.util.Optional;
import static java.util.stream.Collectors.toList;
+import static
org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
/** A {@link Committer} for Apache Iceberg. */
public class IcebergCommitter implements Committer<WriteResultWrapper> {
@@ -83,6 +88,14 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
}
private void commit(List<WriteResultWrapper> writeResultWrappers) {
+ if (writeResultWrappers.isEmpty()) {
+ return;
+ }
+ // all commits a same checkpoint-id
+ long checkpointId = writeResultWrappers.get(0).getCheckpointId();
+ String newFlinkJobId = writeResultWrappers.get(0).getJobId();
+ String operatorId = writeResultWrappers.get(0).getOperatorId();
+
Map<TableId, List<WriteResult>> tableMap = new HashMap<>();
for (WriteResultWrapper writeResultWrapper : writeResultWrappers) {
List<WriteResult> writeResult =
@@ -93,11 +106,29 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
}
for (Map.Entry<TableId, List<WriteResult>> entry :
tableMap.entrySet()) {
TableId tableId = entry.getKey();
- Optional<TableMetric> tableMetric = getTableMetric(tableId);
- tableMetric.ifPresent(TableMetric::increaseCommitTimes);
+
Table table =
catalog.loadTable(
TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName()));
+
+ Snapshot snapshot = table.currentSnapshot();
+ if (snapshot != null) {
+ Iterable<Snapshot> ancestors =
+ SnapshotUtil.ancestorsOf(snapshot.snapshotId(),
table::snapshot);
+ long lastCheckpointId =
+ getMaxCommittedCheckpointId(ancestors, newFlinkJobId,
operatorId);
+ if (lastCheckpointId == checkpointId) {
+ LOGGER.warn(
+ "Checkpoint id {} has been committed to table {},
skipping",
+ checkpointId,
+ tableId.identifier());
+ continue;
+ }
+ }
+
+ Optional<TableMetric> tableMetric = getTableMetric(tableId);
+ tableMetric.ifPresent(TableMetric::increaseCommitTimes);
+
List<WriteResult> results = entry.getValue();
List<DataFile> dataFiles =
results.stream()
@@ -117,15 +148,47 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
if (deleteFiles.isEmpty()) {
AppendFiles append = table.newAppend();
dataFiles.forEach(append::appendFile);
- append.commit();
+ commitOperation(append, newFlinkJobId, operatorId,
checkpointId);
} else {
RowDelta delta = table.newRowDelta();
dataFiles.forEach(delta::addRows);
deleteFiles.forEach(delta::addDeletes);
- delta.commit();
+ commitOperation(delta, newFlinkJobId, operatorId,
checkpointId);
+ }
+ }
+ }
+ }
+
+ private static long getMaxCommittedCheckpointId(
+ Iterable<Snapshot> ancestors, String flinkJobId, String
operatorId) {
+ long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1;
+
+ for (Snapshot ancestor : ancestors) {
+ Map<String, String> summary = ancestor.summary();
+ String snapshotFlinkJobId = summary.get(SinkUtil.FLINK_JOB_ID);
+ String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID);
+ if (flinkJobId.equals(snapshotFlinkJobId)
+ && (snapshotOperatorId == null ||
snapshotOperatorId.equals(operatorId))) {
+ String value =
summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID);
+ if (value != null) {
+ lastCommittedCheckpointId = Long.parseLong(value);
+ break;
}
}
}
+
+ return lastCommittedCheckpointId;
+ }
+
+ private static void commitOperation(
+ SnapshotUpdate<?> operation,
+ String newFlinkJobId,
+ String operatorId,
+ long checkpointId) {
+ operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID,
Long.toString(checkpointId));
+ operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
+ operation.set(SinkUtil.OPERATOR_ID, operatorId);
+ operation.commit();
}
private Optional<TableMetric> getTableMetric(TableId tableId) {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
index 2d3269f55..db43adc3a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.cdc.common.event.Event;
@@ -30,6 +32,7 @@ import
org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction.CompactionOper
import
org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction.CompactionOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
@@ -39,8 +42,10 @@ import
org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.time.ZoneId;
+import java.util.Collection;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
/** A {@link Sink} implementation for Apache Iceberg. */
public class IcebergSink
@@ -48,7 +53,8 @@ public class IcebergSink
WithPreWriteTopology<Event>,
WithPreCommitTopology<Event, WriteResultWrapper>,
TwoPhaseCommittingSink<Event, WriteResultWrapper>,
- WithPostCommitTopology<Event, WriteResultWrapper> {
+ WithPostCommitTopology<Event, WriteResultWrapper>,
+ SupportsWriterState<Event, IcebergWriterState> {
protected final Map<String, String> catalogOptions;
protected final Map<String, String> tableOptions;
@@ -57,15 +63,22 @@ public class IcebergSink
private final CompactionOptions compactionOptions;
+ private String jobId;
+
+ private String operatorId;
+
public IcebergSink(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
ZoneId zoneId,
- CompactionOptions compactionOptions) {
+ CompactionOptions compactionOptions,
+ String jobIdPrefix) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.zoneId = zoneId;
this.compactionOptions = compactionOptions;
+ this.jobId = jobIdPrefix + UUID.randomUUID();
+ this.operatorId = UUID.randomUUID().toString();
}
@Override
@@ -92,20 +105,60 @@ public class IcebergSink
@Override
public SinkWriter<Event> createWriter(InitContext context) {
+ long lastCheckpointId =
+ context.getRestoredCheckpointId()
+ .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
return new IcebergWriter(
catalogOptions,
context.getTaskInfo().getIndexOfThisSubtask(),
context.getTaskInfo().getAttemptNumber(),
- zoneId);
+ zoneId,
+ lastCheckpointId,
+ jobId,
+ operatorId);
}
@Override
public SinkWriter<Event> createWriter(WriterInitContext context) {
+ long lastCheckpointId =
+ context.getRestoredCheckpointId()
+ .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+ return new IcebergWriter(
+ catalogOptions,
+ context.getTaskInfo().getIndexOfThisSubtask(),
+ context.getTaskInfo().getAttemptNumber(),
+ zoneId,
+ lastCheckpointId,
+ jobId,
+ operatorId);
+ }
+
+ @Override
+ public StatefulSinkWriter<Event, IcebergWriterState> restoreWriter(
+ WriterInitContext context, Collection<IcebergWriterState>
writerStates) {
+ // No need to read checkpointId from state
+ long lastCheckpointId =
+ context.getRestoredCheckpointId()
+ .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+ if (writerStates != null && !writerStates.isEmpty()) {
+ IcebergWriterState icebergWriterState =
writerStates.iterator().next();
+ jobId = icebergWriterState.getJobId();
+ operatorId = icebergWriterState.getOperatorId();
+ }
+
return new IcebergWriter(
catalogOptions,
context.getTaskInfo().getIndexOfThisSubtask(),
context.getTaskInfo().getAttemptNumber(),
- zoneId);
+ zoneId,
+ lastCheckpointId,
+ jobId,
+ operatorId);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<IcebergWriterState>
getWriterStateSerializer() {
+ return new IcebergWriterStateSerializer();
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
index 2914f3bca..62e47d897 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
@@ -46,12 +47,15 @@ import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** A {@link SinkWriter} for Apache Iceberg. */
-public class IcebergWriter implements CommittingSinkWriter<Event,
WriteResultWrapper> {
+public class IcebergWriter
+ implements CommittingSinkWriter<Event, WriteResultWrapper>,
+ StatefulSinkWriter<Event, IcebergWriterState> {
private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergWriter.class);
@@ -75,8 +79,20 @@ public class IcebergWriter implements
CommittingSinkWriter<Event, WriteResultWra
private final ZoneId zoneId;
+ private long lastCheckpointId;
+
+ private final String jobId;
+
+ private final String operatorId;
+
public IcebergWriter(
- Map<String, String> catalogOptions, int taskId, int attemptId,
ZoneId zoneId) {
+ Map<String, String> catalogOptions,
+ int taskId,
+ int attemptId,
+ ZoneId zoneId,
+ long lastCheckpointId,
+ String jobId,
+ String operatorId) {
catalog =
CatalogUtil.buildIcebergCatalog(
this.getClass().getSimpleName(), catalogOptions, new
Configuration());
@@ -87,14 +103,30 @@ public class IcebergWriter implements
CommittingSinkWriter<Event, WriteResultWra
this.taskId = taskId;
this.attemptId = attemptId;
this.zoneId = zoneId;
+ this.lastCheckpointId = lastCheckpointId;
+ this.jobId = jobId;
+ this.operatorId = operatorId;
+ LOGGER.info(
+ "IcebergWriter created, taskId: {}, attemptId: {},
lastCheckpointId: {}, jobId: {}, operatorId: {}",
+ taskId,
+ attemptId,
+ lastCheckpointId,
+ jobId,
+ operatorId);
+ }
+
+ @Override
+ public List<IcebergWriterState> snapshotState(long checkpointId) {
+ return Collections.singletonList(new IcebergWriterState(jobId,
operatorId));
}
@Override
- public Collection<WriteResultWrapper> prepareCommit() throws IOException,
InterruptedException {
+ public Collection<WriteResultWrapper> prepareCommit() throws IOException {
List<WriteResultWrapper> list = new ArrayList<>();
list.addAll(temporaryWriteResult);
list.addAll(getWriteResult());
temporaryWriteResult.clear();
+ lastCheckpointId++;
return list;
}
@@ -149,10 +181,16 @@ public class IcebergWriter implements
CommittingSinkWriter<Event, WriteResultWra
}
private List<WriteResultWrapper> getWriteResult() throws IOException {
+ long currentCheckpointId = lastCheckpointId + 1;
List<WriteResultWrapper> writeResults = new ArrayList<>();
for (Map.Entry<TableId, TaskWriter<RowData>> entry :
writerMap.entrySet()) {
WriteResultWrapper writeResultWrapper =
- new WriteResultWrapper(entry.getValue().complete(),
entry.getKey());
+ new WriteResultWrapper(
+ entry.getValue().complete(),
+ entry.getKey(),
+ currentCheckpointId,
+ jobId,
+ operatorId);
writeResults.add(writeResultWrapper);
LOGGER.info(writeResultWrapper.buildDescription());
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterState.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterState.java
new file mode 100644
index 000000000..767e3dca5
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterState.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.v2;
+
+import java.util.Objects;
+
+/** The state of the {@link IcebergWriter}. */
+public class IcebergWriterState {
+
+ // The job ID associated with this writer state
+ private final String jobId;
+
+ // The operator ID associated with this writer state
+ private final String operatorId;
+
+ public IcebergWriterState(String jobId, String operatorId) {
+ this.jobId = jobId;
+ this.operatorId = operatorId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobId, operatorId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ IcebergWriterState that = (IcebergWriterState) obj;
+ return Objects.equals(jobId, that.jobId) && Objects.equals(operatorId,
that.operatorId);
+ }
+
+ @Override
+ public String toString() {
+ return "IcebergWriterState{"
+ + "jobId='"
+ + jobId
+ + '\''
+ + ", operatorId='"
+ + operatorId
+ + '\''
+ + '}';
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializer.java
new file mode 100644
index 000000000..139d725fd
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.v2;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+
+/** A {@link IcebergWriterStateSerializer} for {@link IcebergWriterState}. */
+public class IcebergWriterStateSerializer implements
SimpleVersionedSerializer<IcebergWriterState> {
+
+ private static final int VERSION = 0;
+
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergWriterState icebergWriterState) throws
IOException {
+ final DataOutputSerializer out = SERIALIZER_CACHE.get();
+ out.writeUTF(icebergWriterState.getJobId());
+ out.writeUTF(icebergWriterState.getOperatorId());
+ final byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
+ }
+
+ @Override
+ public IcebergWriterState deserialize(int version, byte[] serialized)
throws IOException {
+ if (version != VERSION) {
+ throw new IOException("Unknown version: " + version);
+ }
+ final DataInputDeserializer in = new DataInputDeserializer(serialized);
+ return new IcebergWriterState(in.readUTF(), in.readUTF());
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
index 5aae210c9..e64cc5535 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
@@ -34,9 +34,23 @@ public class WriteResultWrapper implements Serializable {
private final TableId tableId;
- public WriteResultWrapper(WriteResult writeResult, TableId tableId) {
+ private final long checkpointId;
+
+ private final String jobId;
+
+ private final String operatorId;
+
+ public WriteResultWrapper(
+ WriteResult writeResult,
+ TableId tableId,
+ long checkpointId,
+ String jobId,
+ String operatorId) {
this.writeResult = writeResult;
this.tableId = tableId;
+ this.checkpointId = checkpointId;
+ this.jobId = jobId;
+ this.operatorId = operatorId;
}
public WriteResult getWriteResult() {
@@ -47,6 +61,18 @@ public class WriteResultWrapper implements Serializable {
return tableId;
}
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
/** Build a simple description for the write result. */
public String buildDescription() {
long addCount = 0;
@@ -63,6 +89,12 @@ public class WriteResultWrapper implements Serializable {
}
return "WriteResult of "
+ tableId
+ + ", CheckpointId: "
+ + checkpointId
+ + ", JobId: "
+ + jobId
+ + ", OperatorId: "
+ + operatorId
+ ", AddCount: "
+ addCount
+ ", DeleteCount: "
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index dff510a41..848fd2584 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -57,11 +57,13 @@ public class IcebergDataSinkFactoryTest {
Configuration conf = Configuration.fromMap(ImmutableMap.<String,
String>builder().build());
conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse");
conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4);
+ conf.set(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX, "FlinkCDC");
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf, conf,
Thread.currentThread().getContextClassLoader()));
Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+ Assertions.assertThat(((IcebergDataSink)
dataSink).jobIdPrefix).isEqualTo("FlinkCDC");
}
@Test
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
index f73a8c921..6d0fce3eb 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
@@ -60,6 +60,7 @@ public class CompactionOperatorTest {
@Test
public void testCompationOperator() throws IOException,
InterruptedException {
+ long checkpointId = 0;
Map<String, String> catalogOptions = new HashMap<>();
String warehouse =
new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
@@ -69,8 +70,17 @@ public class CompactionOperatorTest {
Catalog catalog =
CatalogUtil.buildIcebergCatalog(
"cdc-iceberg-catalog", catalogOptions, new
Configuration());
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
- new IcebergWriter(catalogOptions, 1, 1,
ZoneId.systemDefault());
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ checkpointId,
+ jobId,
+ operatorId);
IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -155,7 +165,10 @@ public class CompactionOperatorTest {
compactionOperator.processElement(
new StreamRecord<>(
new CommittableWithLineage<>(
- new WriteResultWrapper(null, tableId), 0L,
0)));
+ new WriteResultWrapper(
+ null, tableId, checkpointId, jobId,
operatorId),
+ 0L,
+ 0)));
Map<String, String> summary =
catalog.loadTable(TableIdentifier.parse(tableId.identifier()))
.currentSnapshot()
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
index cb8daccba..3c36f12cb 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
@@ -100,7 +100,12 @@ public class IcebergSinkITCase {
DataStream<Event> stream = env.fromData(events,
TypeInformation.of(Event.class));
Sink<Event> icebergSink =
- new IcebergSink(catalogOptions, null, null,
CompactionOptions.builder().build());
+ new IcebergSink(
+ catalogOptions,
+ null,
+ null,
+ CompactionOptions.builder().build(),
+ "FlinkCDC");
String[] expected = new String[] {"21, 1.732, Disenchanted", "17,
6.28, Doris Day"};
stream.sinkTo(icebergSink);
env.execute("Values to Iceberg Sink");
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializerTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializerTest.java
new file mode 100644
index 000000000..591c9709f
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializerTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.v2;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+/** Tests for {@link IcebergWriterState} and {@link
IcebergWriterStateSerializer}. */
+public class IcebergWriterStateSerializerTest {
+
+ @Test
+ public void testSerializer() throws IOException {
+ IcebergWriterState icebergWriterState = new
IcebergWriterState("jobId", "operatorId");
+ IcebergWriterStateSerializer icebergWriterStateSerializer =
+ new IcebergWriterStateSerializer();
+ byte[] bytes =
icebergWriterStateSerializer.serialize(icebergWriterState);
+ Assertions.assertThat(icebergWriterStateSerializer.deserialize(0,
bytes))
+ .isEqualTo(icebergWriterState);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 2e28a39d0..b16b88931 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
@@ -88,8 +89,11 @@ public class IcebergWriterTest {
Catalog catalog =
CatalogUtil.buildIcebergCatalog(
"cdc-iceberg-catalog", catalogOptions, new
Configuration());
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
- new IcebergWriter(catalogOptions, 1, 1,
ZoneId.systemDefault());
+ new IcebergWriter(
+ catalogOptions, 1, 1, ZoneId.systemDefault(), 0,
jobId, operatorId);
IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -277,7 +281,10 @@ public class IcebergWriterTest {
CatalogUtil.buildIcebergCatalog(
"cdc-iceberg-catalog", catalogOptions, new
Configuration());
ZoneId pipelineZoneId = ZoneId.systemDefault();
- IcebergWriter icebergWriter = new IcebergWriter(catalogOptions, 1, 1,
pipelineZoneId);
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+ IcebergWriter icebergWriter =
+ new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0,
jobId, operatorId);
IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -382,8 +389,11 @@ public class IcebergWriterTest {
Catalog catalog =
CatalogUtil.buildIcebergCatalog(
"cdc-iceberg-catalog", catalogOptions, new
Configuration());
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
- new IcebergWriter(catalogOptions, 1, 1,
ZoneId.systemDefault());
+ new IcebergWriter(
+ catalogOptions, 1, 1, ZoneId.systemDefault(), 0,
jobId, operatorId);
TableId tableId = TableId.parse("test.iceberg_table");
Map<TableId, List<String>> partitionMaps = new HashMap<>();
@@ -457,6 +467,91 @@ public class IcebergWriterTest {
Assertions.assertThat(result.size()).isEqualTo(2);
}
+ @Test
+ public void testWithRepeatCommit() throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+ ZoneId pipelineZoneId = ZoneId.systemDefault();
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+ IcebergWriter icebergWriter =
+ new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0,
jobId, operatorId);
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+ TableId tableId = TableId.parse("test.iceberg_table");
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName());
+ // Create Table.
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ tableId,
+ Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ DataTypes.BIGINT().notNull(),
+ "column for id",
+ "AUTO_DECREMENT()")
+ .physicalColumn(
+ "name", DataTypes.VARCHAR(100),
"column for name", null)
+ .primaryKey("id")
+ .build());
+ icebergMetadataApplier.applySchemaChange(createTableEvent);
+ icebergWriter.write(createTableEvent, null);
+ BinaryRecordDataGenerator dataGenerator =
+ new BinaryRecordDataGenerator(
+
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
+ BinaryRecordData record1 =
+ dataGenerator.generate(
+ new Object[] {
+ 1L, BinaryStringData.fromString("char1"),
+ });
+ DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId,
record1);
+ icebergWriter.write(dataChangeEvent, null);
+ Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
+ IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions);
+ Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+ List<String> result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result.size()).isEqualTo(1);
+ Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1");
+ Map<String, String> summary =
+ catalog.loadTable(tableIdentifier).currentSnapshot().summary();
+
Assertions.assertThat(summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID)).isEqualTo("1");
+
Assertions.assertThat(summary.get(SinkUtil.FLINK_JOB_ID)).isEqualTo(jobId);
+
Assertions.assertThat(summary.get(SinkUtil.OPERATOR_ID)).isEqualTo(operatorId);
+
+ // repeat commit with same committables, should not cause duplicate
data.
+ BinaryRecordData record2 =
+ dataGenerator.generate(
+ new Object[] {
+ 2L, BinaryStringData.fromString("char2"),
+ });
+ DataChangeEvent dataChangeEvent2 =
DataChangeEvent.insertEvent(tableId, record2);
+ icebergWriter.write(dataChangeEvent2, null);
+ writeResults = icebergWriter.prepareCommit();
+ collection =
+
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+ icebergCommitter.commit(collection);
+ summary =
catalog.loadTable(tableIdentifier).currentSnapshot().summary();
+ Assertions.assertThat(summary.get("total-data-files")).isEqualTo("2");
+ Assertions.assertThat(summary.get("added-records")).isEqualTo("1");
+
Assertions.assertThat(summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID)).isEqualTo("2");
+
Assertions.assertThat(summary.get(SinkUtil.FLINK_JOB_ID)).isEqualTo(jobId);
+
Assertions.assertThat(summary.get(SinkUtil.OPERATOR_ID)).isEqualTo(operatorId);
+
+ result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result.size()).isEqualTo(2);
+ Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1",
"2, char2");
+ }
+
/** Mock CommitRequestImpl. */
public static class MockCommitRequestImpl<CommT> extends
CommitRequestImpl<CommT> {