This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 15288b3d3 [FLINK-38450][iceberg] Fix duplicate records when schema
change splits writes within a checkpoint (#4360)
15288b3d3 is described below
commit 15288b3d32989bf8c7ebaa6c4d8703ca1771d757
Author: Spoorthi Basu <[email protected]>
AuthorDate: Tue Jun 2 21:30:57 2026 -0400
[FLINK-38450][iceberg] Fix duplicate records when schema change splits
writes within a checkpoint (#4360)
---
.../iceberg/sink/v2/IcebergCommitter.java | 172 +++-
.../connectors/iceberg/sink/v2/IcebergWriter.java | 50 +-
.../iceberg/sink/v2/WriteResultWrapper.java | 22 +-
.../sink/v2/WriteResultWrapperSerializer.java | 6 +-
.../iceberg/sink/v2/IcebergWriterTest.java | 980 +++++++++++++++++++++
.../pipeline/tests/MySqlToIcebergE2eITCase.java | 20 +-
6 files changed, 1202 insertions(+), 48 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
index 759777440..847292c5b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
import static java.util.stream.Collectors.toList;
import static
org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
@@ -62,6 +63,15 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
public static final String TABLE_GROUP_KEY = "table";
+ // Use a flink-cdc. prefix so these don't clash with the flink. namespace
reserved by the
+ // Iceberg Flink connector.
+
+ /** Snapshot summary key for the batch index; used to resume partial
commits on retry. */
+ static final String FLINK_BATCH_INDEX = "flink-cdc.batch-index";
+
+ /** Snapshot summary key for the checkpoint ID on intermediate batch
commits. */
+ static final String FLINK_CHECKPOINT_ID_PROP = "flink-cdc.checkpoint-id";
+
private final Catalog catalog;
private final SinkCommitterMetricGroup metricGroup;
@@ -96,74 +106,140 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
if (writeResultWrappers.isEmpty()) {
return;
}
- // all commits a same checkpoint-id
long checkpointId = writeResultWrappers.get(0).getCheckpointId();
String newFlinkJobId = writeResultWrappers.get(0).getJobId();
String operatorId = writeResultWrappers.get(0).getOperatorId();
- Map<TableId, List<WriteResult>> tableMap = new HashMap<>();
- for (WriteResultWrapper writeResultWrapper : writeResultWrappers) {
- List<WriteResult> writeResult =
- tableMap.getOrDefault(writeResultWrapper.getTableId(), new
ArrayList<>());
- writeResult.add(writeResultWrapper.getWriteResult());
- tableMap.put(writeResultWrapper.getTableId(), writeResult);
- LOGGER.info(writeResultWrapper.buildDescription());
+ Map<TableId, List<WriteResultWrapper>> tableMap = new HashMap<>();
+ for (WriteResultWrapper w : writeResultWrappers) {
+ tableMap.computeIfAbsent(w.getTableId(), k -> new
ArrayList<>()).add(w);
}
- for (Map.Entry<TableId, List<WriteResult>> entry :
tableMap.entrySet()) {
+
+ for (Map.Entry<TableId, List<WriteResultWrapper>> entry :
tableMap.entrySet()) {
TableId tableId = entry.getKey();
+ // Group by batchIndex so wrappers from different subtasks for the
same batch
+ // are merged into one snapshot, not committed separately.
+ TreeMap<Integer, List<WriteResultWrapper>> batchGroups = new
TreeMap<>();
+ for (WriteResultWrapper w : entry.getValue()) {
+ batchGroups.computeIfAbsent(w.getBatchIndex(), k -> new
ArrayList<>()).add(w);
+ LOGGER.info(w.buildDescription());
+ }
+
Table table =
catalog.loadTable(
TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName()));
+ int startBatchIndex = 0;
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
Iterable<Snapshot> ancestors =
SnapshotUtil.ancestorsOf(snapshot.snapshotId(),
table::snapshot);
- long lastCheckpointId =
+ long lastCommittedCheckpointId =
getMaxCommittedCheckpointId(ancestors, newFlinkJobId,
operatorId);
- if (lastCheckpointId == checkpointId) {
+ if (lastCommittedCheckpointId >= checkpointId) {
LOGGER.warn(
"Checkpoint id {} has been committed to table {},
skipping",
checkpointId,
tableId.identifier());
continue;
}
+ ancestors = SnapshotUtil.ancestorsOf(snapshot.snapshotId(),
table::snapshot);
+ startBatchIndex =
+ getLastCommittedBatchIndex(
+ ancestors, newFlinkJobId, operatorId,
checkpointId)
+ + 1;
}
Optional<TableMetric> tableMetric = getTableMetric(tableId);
tableMetric.ifPresent(TableMetric::increaseCommitTimes);
- List<WriteResult> results = entry.getValue();
- List<DataFile> dataFiles =
- results.stream()
- .filter(payload -> payload.dataFiles() != null)
- .flatMap(payload ->
Arrays.stream(payload.dataFiles()))
- .filter(dataFile -> dataFile.recordCount() > 0)
- .collect(toList());
- List<DeleteFile> deleteFiles =
- results.stream()
- .filter(payload -> payload.deleteFiles() != null)
- .flatMap(payload ->
Arrays.stream(payload.deleteFiles()))
- .filter(deleteFile -> deleteFile.recordCount() > 0)
- .collect(toList());
- if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
- LOGGER.info(String.format("Nothing to commit to table %s,
skipping", table.name()));
- } else {
+ int lastNonEmptyBatchIndex = -1;
+ for (Map.Entry<Integer, List<WriteResultWrapper>> g :
batchGroups.entrySet()) {
+ List<DataFile> df = collectDataFilesFromGroup(g.getValue());
+ List<DeleteFile> del =
collectDeleteFilesFromGroup(g.getValue());
+ if (!df.isEmpty() || !del.isEmpty()) {
+ lastNonEmptyBatchIndex = g.getKey();
+ }
+ }
+
+ // Commit each batch as a separate snapshot so sequence numbers
increase per batch.
+ for (Map.Entry<Integer, List<WriteResultWrapper>> g :
batchGroups.entrySet()) {
+ int batchIdx = g.getKey();
+ if (batchIdx < startBatchIndex) {
+ LOGGER.info(
+ "Batch {} for checkpoint {} of table {} already
committed, skipping",
+ batchIdx,
+ checkpointId,
+ tableId.identifier());
+ continue;
+ }
+
+ List<DataFile> dataFiles =
collectDataFilesFromGroup(g.getValue());
+ List<DeleteFile> deleteFiles =
collectDeleteFilesFromGroup(g.getValue());
+
+ if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
+ LOGGER.info(
+ "Batch {} for checkpoint {} of table {} has
nothing to commit, skipping",
+ batchIdx,
+ checkpointId,
+ tableId.identifier());
+ continue;
+ }
+
+ SnapshotUpdate<?> operation;
if (deleteFiles.isEmpty()) {
AppendFiles append = table.newAppend();
dataFiles.forEach(append::appendFile);
- commitOperation(append, newFlinkJobId, operatorId,
checkpointId);
+ operation = append;
} else {
RowDelta delta = table.newRowDelta();
dataFiles.forEach(delta::addRows);
deleteFiles.forEach(delta::addDeletes);
- commitOperation(delta, newFlinkJobId, operatorId,
checkpointId);
+ operation = delta;
}
+
+ operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
+ operation.set(SinkUtil.OPERATOR_ID, operatorId);
+ operation.set(FLINK_BATCH_INDEX, String.valueOf(batchIdx));
+ operation.set(FLINK_CHECKPOINT_ID_PROP,
String.valueOf(checkpointId));
+ if (batchIdx == lastNonEmptyBatchIndex) {
+ operation.set(
+ SinkUtil.MAX_COMMITTED_CHECKPOINT_ID,
String.valueOf(checkpointId));
+ }
+ operation.commit();
}
}
}
+ private static List<DataFile>
collectDataFilesFromGroup(List<WriteResultWrapper> group) {
+ return group.stream()
+ .flatMap(w -> collectDataFiles(w.getWriteResult()).stream())
+ .collect(toList());
+ }
+
+ private static List<DeleteFile>
collectDeleteFilesFromGroup(List<WriteResultWrapper> group) {
+ return group.stream()
+ .flatMap(w -> collectDeleteFiles(w.getWriteResult()).stream())
+ .collect(toList());
+ }
+
+ private static List<DataFile> collectDataFiles(WriteResult result) {
+ if (result.dataFiles() == null) {
+ return new ArrayList<>();
+ }
+ return Arrays.stream(result.dataFiles()).filter(f -> f.recordCount() >
0).collect(toList());
+ }
+
+ private static List<DeleteFile> collectDeleteFiles(WriteResult result) {
+ if (result.deleteFiles() == null) {
+ return new ArrayList<>();
+ }
+ return Arrays.stream(result.deleteFiles())
+ .filter(f -> f.recordCount() > 0)
+ .collect(toList());
+ }
+
private static long getMaxCommittedCheckpointId(
Iterable<Snapshot> ancestors, String flinkJobId, String
operatorId) {
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1;
@@ -185,15 +261,35 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
return lastCommittedCheckpointId;
}
- private static void commitOperation(
- SnapshotUpdate<?> operation,
- String newFlinkJobId,
- String operatorId,
- long checkpointId) {
- operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID,
Long.toString(checkpointId));
- operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
- operation.set(SinkUtil.OPERATOR_ID, operatorId);
- operation.commit();
+ /**
+ * Returns the highest batch index already committed for the given
checkpoint, or -1 if none.
+ * Used to skip already-persisted batches on retry.
+ */
+ private static int getLastCommittedBatchIndex(
+ Iterable<Snapshot> ancestors, String flinkJobId, String
operatorId, long checkpointId) {
+ for (Snapshot ancestor : ancestors) {
+ Map<String, String> summary = ancestor.summary();
+ if (!flinkJobId.equals(summary.get(SinkUtil.FLINK_JOB_ID))) {
+ continue;
+ }
+ String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID);
+ if (snapshotOperatorId != null &&
!snapshotOperatorId.equals(operatorId)) {
+ continue;
+ }
+ // Stop once we pass a fully-committed earlier checkpoint;
intermediate batch
+ // snapshots for the current checkpoint lie between it and the
current tip.
+ String maxCommittedStr =
summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID);
+ if (maxCommittedStr != null && Long.parseLong(maxCommittedStr) <
checkpointId) {
+ break;
+ }
+ String snapshotCheckpointId =
summary.get(FLINK_CHECKPOINT_ID_PROP);
+ if (snapshotCheckpointId != null
+ && Long.parseLong(snapshotCheckpointId) == checkpointId) {
+ String batchIndexStr = summary.get(FLINK_BATCH_INDEX);
+ return batchIndexStr != null ? Integer.parseInt(batchIndexStr)
: 0;
+ }
+ }
+ return -1;
}
private Optional<TableMetric> getTableMetric(TableId tableId) {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
index cbcd3b98e..9af29d88c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
@@ -72,6 +72,11 @@ public class IcebergWriter
private final List<WriteResultWrapper> temporaryWriteResult;
+ /**
+ * Per-table batch index; incremented on each schema-change flush, even
when no writer exists.
+ */
+ private Map<TableId, Integer> tableBatchIndexMap;
+
private Catalog catalog;
private final int taskId;
@@ -102,6 +107,7 @@ public class IcebergWriter
writerFactoryMap = new HashMap<>();
writerMap = new HashMap<>();
schemaMap = new HashMap<>();
+ tableBatchIndexMap = new HashMap<>();
temporaryWriteResult = new ArrayList<>();
this.taskId = taskId;
this.attemptId = attemptId;
@@ -129,6 +135,7 @@ public class IcebergWriter
list.addAll(temporaryWriteResult);
list.addAll(getWriteResult());
temporaryWriteResult.clear();
+ tableBatchIndexMap.clear();
lastCheckpointId++;
return list;
}
@@ -166,6 +173,11 @@ public class IcebergWriter
} else {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
TableId tableId = schemaChangeEvent.tableId();
+ // Flush only when the table is already known; skip on initial
CreateTableEvent since
+ // no data has been written yet and there is nothing to split.
+ if (schemaMap.containsKey(tableId)) {
+ flushTableWriter(tableId);
+ }
TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId);
Schema newSchema =
@@ -179,21 +191,46 @@ public class IcebergWriter
@Override
public void flush(boolean flush) throws IOException {
- // Notice: flush method may be called many times during one checkpoint.
- temporaryWriteResult.addAll(getWriteResult());
+ // Clear the factory cache so the next write picks up the latest
catalog schema.
+ // Writers keep running; schema-change splits are handled in
flushTableWriter.
+ writerFactoryMap.clear();
+ }
+
+ private void flushTableWriter(TableId tableId) throws IOException {
+ TaskWriter<RowData> writer = writerMap.remove(tableId);
+ // Advance even when no writer exists, to keep batchIndex in sync
across subtasks.
+ int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
+ tableBatchIndexMap.put(tableId, batchIndex + 1);
+ if (writer == null) {
+ return;
+ }
+ WriteResultWrapper writeResultWrapper =
+ new WriteResultWrapper(
+ writer.complete(),
+ tableId,
+ lastCheckpointId + 1,
+ jobId,
+ operatorId,
+ batchIndex);
+ temporaryWriteResult.add(writeResultWrapper);
+ LOGGER.info(writeResultWrapper.buildDescription());
+ writerFactoryMap.remove(tableId);
}
private List<WriteResultWrapper> getWriteResult() throws IOException {
long currentCheckpointId = lastCheckpointId + 1;
List<WriteResultWrapper> writeResults = new ArrayList<>();
for (Map.Entry<TableId, TaskWriter<RowData>> entry :
writerMap.entrySet()) {
+ TableId tableId = entry.getKey();
+ int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
WriteResultWrapper writeResultWrapper =
new WriteResultWrapper(
entry.getValue().complete(),
- entry.getKey(),
+ tableId,
currentCheckpointId,
jobId,
- operatorId);
+ operatorId,
+ batchIndex);
writeResults.add(writeResultWrapper);
LOGGER.info(writeResultWrapper.buildDescription());
}
@@ -225,6 +262,11 @@ public class IcebergWriter
writerFactoryMap = null;
}
+ if (tableBatchIndexMap != null) {
+ tableBatchIndexMap.clear();
+ tableBatchIndexMap = null;
+ }
+
catalog = null;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
index e64cc5535..3e8d733c5 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
@@ -40,17 +40,31 @@ public class WriteResultWrapper implements Serializable {
private final String operatorId;
+ /** Batch index within the checkpoint for this table; increments on each
schema-change flush. */
+ private final int batchIndex;
+
public WriteResultWrapper(
WriteResult writeResult,
TableId tableId,
long checkpointId,
String jobId,
- String operatorId) {
+ String operatorId,
+ int batchIndex) {
this.writeResult = writeResult;
this.tableId = tableId;
this.checkpointId = checkpointId;
this.jobId = jobId;
this.operatorId = operatorId;
+ this.batchIndex = batchIndex;
+ }
+
+ public WriteResultWrapper(
+ WriteResult writeResult,
+ TableId tableId,
+ long checkpointId,
+ String jobId,
+ String operatorId) {
+ this(writeResult, tableId, checkpointId, jobId, operatorId, 0);
}
public WriteResult getWriteResult() {
@@ -73,6 +87,10 @@ public class WriteResultWrapper implements Serializable {
return operatorId;
}
+ public int getBatchIndex() {
+ return batchIndex;
+ }
+
/** Build a simple description for the write result. */
public String buildDescription() {
long addCount = 0;
@@ -95,6 +113,8 @@ public class WriteResultWrapper implements Serializable {
+ jobId
+ ", OperatorId: "
+ operatorId
+ + ", BatchIndex: "
+ + batchIndex
+ ", AddCount: "
+ addCount
+ ", DeleteCount: "
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapperSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapperSerializer.java
index ca8775d3e..26b5b942d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapperSerializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapperSerializer.java
@@ -29,7 +29,8 @@ import java.io.IOException;
/** Serializer for {@link WriteResultWrapper}. */
class WriteResultWrapperSerializer implements
SimpleVersionedSerializer<WriteResultWrapper> {
- private static final int VERSION = 1;
+ // v2 added the batchIndex field. v1 payloads still read back: the missing
field defaults to 0.
+ private static final int VERSION = 2;
@Override
public int getVersion() {
@@ -47,7 +48,8 @@ class WriteResultWrapperSerializer implements
SimpleVersionedSerializer<WriteRes
@Override
public WriteResultWrapper deserialize(int version, byte[] serialized)
throws IOException {
- if (version == 1) {
+ // v1 and v2 share the same byte layout, so both deserialize the same
way.
+ if (version == 1 || version == 2) {
DataInputDeserializer view = new DataInputDeserializer(serialized);
byte[] resultBuf = new byte[serialized.length];
view.read(resultBuf);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 745aea81f..71e08de4c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -42,7 +42,11 @@ import
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequ
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -66,6 +70,7 @@ import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -596,6 +601,981 @@ public class IcebergWriterTest {
Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1",
"2, char2");
}
+ @Test
+ public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint()
throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+ IcebergWriter icebergWriter =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ tableId,
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build());
+ icebergMetadataApplier.applySchemaChange(createTableEvent);
+ icebergWriter.write(createTableEvent, null);
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(
+
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
+
+ RecordData recordA =
+ generator.generate(new Object[] {1L,
BinaryStringData.fromString("a")});
+ RecordData recordB =
+ generator.generate(new Object[] {1L,
BinaryStringData.fromString("b")});
+
+ icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA),
null);
+ // flush resets the factory cache but keeps the writer running, so the
update
+ // lands in the same writer as the insert and uses a position delete.
+ icebergWriter.flush(false);
+ icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordA,
recordB), null);
+
+ Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
+
+ // Both writes went through the same writer (batchIndex 0) since flush
only
+ // cleared the factory. Position delete handles dedup within the
snapshot.
+ List<Integer> batchIndexes =
+ writeResults.stream()
+ .map(WriteResultWrapper::getBatchIndex)
+ .sorted()
+ .collect(Collectors.toList());
+ Assertions.assertThat(batchIndexes).containsExactly(0);
+
+ IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions, new HashMap<>());
+ Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+
+ List<String> result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result).containsExactly("1, b");
+ }
+
+ /**
+ * A schema change mid-checkpoint splits writes into two batches for the
same table. Both
+ * batches must not land in the same Iceberg snapshot: files in one
snapshot share the same
+ * seq_num, so the equality-delete from batch 1 would silently miss the
insert from batch 0.
+ * Each batch gets its own snapshot so the delete's seq_num is strictly
higher than the data it
+ * targets.
+ */
+ @Test
+ public void testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates()
throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+ IcebergWriter icebergWriter =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ Schema initialSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
initialSchema);
+ icebergMetadataApplier.applySchemaChange(createTableEvent);
+ icebergWriter.write(createTableEvent, null);
+
+ BinaryRecordDataGenerator oldGenerator =
+ new BinaryRecordDataGenerator(
+ initialSchema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ // Insert (id=1, name="a") — goes into writer batch 0.
+ RecordData recordA =
+ oldGenerator.generate(new Object[] {1L,
BinaryStringData.fromString("a")});
+ icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA),
null);
+
+ // Schema change: AddColumn triggers flushTableWriter, completing
batch 0.
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(addColumnEvent);
+ icebergWriter.write(addColumnEvent, null);
+
+ // Update (id=1) with the new schema — goes into writer batch 1.
+ Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema,
addColumnEvent);
+ BinaryRecordDataGenerator newGenerator =
+ new BinaryRecordDataGenerator(
+ newSchema.getColumnDataTypes().toArray(new
DataType[0]));
+ RecordData recordANew =
+ newGenerator.generate(new Object[] {1L,
BinaryStringData.fromString("a"), null});
+ RecordData recordB =
+ newGenerator.generate(new Object[] {1L,
BinaryStringData.fromString("b"), null});
+ icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordANew,
recordB), null);
+
+ Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
+ IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions, new HashMap<>());
+ Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+
+ // Expect only (1, b, null) — batch 0's stale (1, a, null) must be
deleted.
+ List<String> result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result).containsExactly("1, b, null");
+ }
+
+ /**
+ * Verifies idempotency when the committer crashes after committing batch
0 but before
+ * committing batch 1 of the same checkpoint.
+ *
+ * <p>On retry, Flink re-delivers the full collection of committables for
that checkpoint. The
+ * committer must detect that batch 0's snapshot is already present in the
table history (via
+ * {@code flink-cdc.batch-index} and {@code flink-cdc.checkpoint-id}
snapshot properties) and
+ * skip it, then commit only batch 1. Without this skip, batch 0's files
would be added a second
+ * time, causing duplicate records.
+ */
+ @Test
+ public void testRetryAfterPartialBatchCommit() throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+ IcebergWriter icebergWriter =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ Schema initialSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
initialSchema);
+ icebergMetadataApplier.applySchemaChange(createTableEvent);
+ icebergWriter.write(createTableEvent, null);
+
+ BinaryRecordDataGenerator oldGenerator =
+ new BinaryRecordDataGenerator(
+ initialSchema.getColumnDataTypes().toArray(new
DataType[0]));
+ RecordData recordA =
+ oldGenerator.generate(new Object[] {1L,
BinaryStringData.fromString("a")});
+ icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA),
null);
+
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(addColumnEvent);
+ icebergWriter.write(addColumnEvent, null);
+
+ Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema,
addColumnEvent);
+ BinaryRecordDataGenerator newGenerator =
+ new BinaryRecordDataGenerator(
+ newSchema.getColumnDataTypes().toArray(new
DataType[0]));
+ RecordData recordANew =
+ newGenerator.generate(new Object[] {1L,
BinaryStringData.fromString("a"), null});
+ RecordData recordB =
+ newGenerator.generate(new Object[] {1L,
BinaryStringData.fromString("b"), null});
+ icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordANew,
recordB), null);
+
+ Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
+ List<WriteResultWrapper> sortedBatches =
+ writeResults.stream()
+
.sorted(Comparator.comparingInt(WriteResultWrapper::getBatchIndex))
+ .collect(Collectors.toList());
+ Assertions.assertThat(sortedBatches).hasSize(2);
+
Assertions.assertThat(sortedBatches.get(0).getBatchIndex()).isEqualTo(0);
+
Assertions.assertThat(sortedBatches.get(1).getBatchIndex()).isEqualTo(1);
+
+ // Simulate a partial commit: manually commit only batch 0 using the
Iceberg API,
+ // setting the intermediate batch properties but NOT
MAX_COMMITTED_CHECKPOINT_ID.
+ // This replicates the on-disk state left behind when the committer
crashes mid-checkpoint.
+ long checkpointId = sortedBatches.get(0).getCheckpointId();
+ Table table =
+ catalog.loadTable(
+ TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName()));
+ RowDelta partialDelta = table.newRowDelta();
+ WriteResultWrapper batch0 = sortedBatches.get(0);
+ if (batch0.getWriteResult().dataFiles() != null) {
+ for (DataFile f : batch0.getWriteResult().dataFiles()) {
+ partialDelta.addRows(f);
+ }
+ }
+ if (batch0.getWriteResult().deleteFiles() != null) {
+ for (DeleteFile f : batch0.getWriteResult().deleteFiles()) {
+ partialDelta.addDeletes(f);
+ }
+ }
+ partialDelta.set(SinkUtil.FLINK_JOB_ID, jobId);
+ partialDelta.set(SinkUtil.OPERATOR_ID, operatorId);
+ partialDelta.set(IcebergCommitter.FLINK_BATCH_INDEX, "0");
+ partialDelta.set(IcebergCommitter.FLINK_CHECKPOINT_ID_PROP,
String.valueOf(checkpointId));
+ // Intentionally omitting MAX_COMMITTED_CHECKPOINT_ID — this is an
incomplete checkpoint.
+ partialDelta.commit();
+
+ // Retry: Flink re-delivers all committables for the checkpoint.
+ IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions, new HashMap<>());
+ Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+
+ // Batch 0 must be skipped (its snapshot is already present); only
batch 1 is committed.
+ // Batch 1's eqDelete (higher sequence number) supersedes batch 0's
data file.
+ List<String> result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result).containsExactly("1, b, null");
+
+ // Verify the final snapshot carries MAX_COMMITTED_CHECKPOINT_ID.
+ Map<String, String> finalSummary =
+ catalog.loadTable(
+ TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName()))
+ .currentSnapshot()
+ .summary();
+
Assertions.assertThat(finalSummary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID))
+ .isEqualTo(String.valueOf(checkpointId));
+ }
+
+ /**
+ * Verifies that two schema-change events within a single checkpoint
(producing three batches)
+ * do not cause duplicate records for the same primary key.
+ *
+ * <p>Timeline: INSERT(id=1,"a") → AddColumn1 flush → UPDATE(id=1,"b") →
AddColumn2 flush →
+ * UPDATE(id=1,"c") → commit. The three batches are committed as three
sequential Iceberg
+ * snapshots (seq=N, N+1, N+2), so each batch's equality-delete supersedes
all earlier data.
+ */
+ @Test
+ public void testNoDuplicateWithMultipleSchemaChangesInOneCheckpoint()
throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+ IcebergWriter icebergWriter =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ Schema schema0 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema0);
+ icebergMetadataApplier.applySchemaChange(createTableEvent);
+ icebergWriter.write(createTableEvent, null);
+
+ // Batch 0: INSERT(id=1,"a")
+ BinaryRecordDataGenerator gen0 =
+ new BinaryRecordDataGenerator(
+ schema0.getColumnDataTypes().toArray(new DataType[0]));
+ RecordData r0a = gen0.generate(new Object[] {1L,
BinaryStringData.fromString("a")});
+ icebergWriter.write(DataChangeEvent.insertEvent(tableId, r0a), null);
+
+ // First schema change → flushTableWriter → batch 0 complete, batch
index starts at 1.
+ AddColumnEvent addCol1 =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra1", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(addCol1);
+ icebergWriter.write(addCol1, null);
+
+ // Batch 1: UPDATE(id=1,"a"→"b") with schema {id, name, extra1}
+ Schema schema1 = SchemaUtils.applySchemaChangeEvent(schema0, addCol1);
+ BinaryRecordDataGenerator gen1 =
+ new BinaryRecordDataGenerator(
+ schema1.getColumnDataTypes().toArray(new DataType[0]));
+ RecordData r1a = gen1.generate(new Object[] {1L,
BinaryStringData.fromString("a"), null});
+ RecordData r1b = gen1.generate(new Object[] {1L,
BinaryStringData.fromString("b"), null});
+ icebergWriter.write(DataChangeEvent.updateEvent(tableId, r1a, r1b),
null);
+
+ // Second schema change → flushTableWriter → batch 1 complete, batch
index now at 2.
+ AddColumnEvent addCol2 =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra2", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(addCol2);
+ icebergWriter.write(addCol2, null);
+
+ // Batch 2: UPDATE(id=1,"b"→"c") with schema {id, name, extra1, extra2}
+ Schema schema2 = SchemaUtils.applySchemaChangeEvent(schema1, addCol2);
+ BinaryRecordDataGenerator gen2 =
+ new BinaryRecordDataGenerator(
+ schema2.getColumnDataTypes().toArray(new DataType[0]));
+ RecordData r2b =
+ gen2.generate(new Object[] {1L,
BinaryStringData.fromString("b"), null, null});
+ RecordData r2c =
+ gen2.generate(new Object[] {1L,
BinaryStringData.fromString("c"), null, null});
+ icebergWriter.write(DataChangeEvent.updateEvent(tableId, r2b, r2c),
null);
+
+ // Verify three batches with indices 0, 1, 2 were produced.
+ Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
+ List<WriteResultWrapper> sortedBatches =
+ writeResults.stream()
+
.sorted(Comparator.comparingInt(WriteResultWrapper::getBatchIndex))
+ .collect(Collectors.toList());
+ Assertions.assertThat(sortedBatches).hasSize(3);
+
Assertions.assertThat(sortedBatches.get(0).getBatchIndex()).isEqualTo(0);
+
Assertions.assertThat(sortedBatches.get(1).getBatchIndex()).isEqualTo(1);
+
Assertions.assertThat(sortedBatches.get(2).getBatchIndex()).isEqualTo(2);
+
+ IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions, new HashMap<>());
+ Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+
+ // Only the final value (id=1,"c") should survive; stale "a" and "b"
must be deleted.
+ List<String> result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result).containsExactly("1, c, null, null");
+ }
+
+ /**
+ * Verifies that a schema-change flush on tableA does not affect tableB.
TableB has no schema
+ * change and commits as a single batch, while tableA's two batches are
committed sequentially.
+ * Both tables must contain exactly the correct final records.
+ */
+ @Test
+ public void testSchemaChangeFlushDoesNotAffectOtherTable() throws
Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+ IcebergWriter icebergWriter =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ TableId tableA = TableId.parse("test.table_a");
+ TableId tableB = TableId.parse("test.table_b");
+
+ Schema schemaA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+ Schema schemaB =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("value", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+
+ icebergMetadataApplier.applySchemaChange(new CreateTableEvent(tableA,
schemaA));
+ icebergMetadataApplier.applySchemaChange(new CreateTableEvent(tableB,
schemaB));
+ icebergWriter.write(new CreateTableEvent(tableA, schemaA), null);
+ icebergWriter.write(new CreateTableEvent(tableB, schemaB), null);
+
+ BinaryRecordDataGenerator genA =
+ new BinaryRecordDataGenerator(
+ schemaA.getColumnDataTypes().toArray(new DataType[0]));
+ BinaryRecordDataGenerator genB =
+ new BinaryRecordDataGenerator(
+ schemaB.getColumnDataTypes().toArray(new DataType[0]));
+
+ // TableA: INSERT(id=1,"a") → schema change flush → UPDATE(id=1,"b")
[2 batches]
+ RecordData rAa = genA.generate(new Object[] {1L,
BinaryStringData.fromString("a")});
+ icebergWriter.write(DataChangeEvent.insertEvent(tableA, rAa), null);
+
+ AddColumnEvent addColA =
+ new AddColumnEvent(
+ tableA,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(addColA);
+ icebergWriter.write(addColA, null);
+
+ Schema schemaANew = SchemaUtils.applySchemaChangeEvent(schemaA,
addColA);
+ BinaryRecordDataGenerator genANew =
+ new BinaryRecordDataGenerator(
+ schemaANew.getColumnDataTypes().toArray(new
DataType[0]));
+ RecordData rAaNew =
+ genANew.generate(new Object[] {1L,
BinaryStringData.fromString("a"), null});
+ RecordData rAb =
+ genANew.generate(new Object[] {1L,
BinaryStringData.fromString("b"), null});
+ icebergWriter.write(DataChangeEvent.updateEvent(tableA, rAaNew, rAb),
null);
+
+ // TableB: INSERT(id=2,"x") → UPDATE(id=2,"y") in the same checkpoint,
no schema change.
+ // Both events land in tableB's single writer (no flush), so dedup is
handled internally.
+ RecordData rBx = genB.generate(new Object[] {2L,
BinaryStringData.fromString("x")});
+ RecordData rBy = genB.generate(new Object[] {2L,
BinaryStringData.fromString("y")});
+ icebergWriter.write(DataChangeEvent.insertEvent(tableB, rBx), null);
+ icebergWriter.write(DataChangeEvent.updateEvent(tableB, rBx, rBy),
null);
+
+ Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
+
+ // TableA should produce 2 batches; tableB should produce 1 batch.
+ Map<TableId, Long> batchCountByTable =
+ writeResults.stream()
+ .collect(
+ Collectors.groupingBy(
+ WriteResultWrapper::getTableId,
Collectors.counting()));
+ Assertions.assertThat(batchCountByTable.get(tableA)).isEqualTo(2);
+ Assertions.assertThat(batchCountByTable.get(tableB)).isEqualTo(1);
+
+ IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions, new HashMap<>());
+ Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+
+ // TableA: only final value survives (stale "a" deleted by batch 1's
eqDelete).
+ List<String> resultA = fetchTableContent(catalog, tableA, null);
+ Assertions.assertThat(resultA).containsExactly("1, b, null");
+
+ // TableB: only final value survives (internal position-delete handles
dedup within writer).
+ List<String> resultB = fetchTableContent(catalog, tableB, null);
+ Assertions.assertThat(resultB).containsExactly("2, y");
+ }
+
+ /**
+ * Verifies that batchIndex stays in sync across subtasks even when a
subtask has no writer for
+ * the table at schema-change flush time (parallelism > 1 scenario).
+ */
+ @Test
+ public void testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange()
throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put(
+ "warehouse",
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString());
+ catalogOptions.put("cache-enabled", "false");
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+
+ // Two subtask writers sharing the same catalog and table.
+ IcebergWriter writer0 =
+ new IcebergWriter(
+ catalogOptions,
+ 0,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergWriter writer1 =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createEvent = new CreateTableEvent(tableId, schema);
+ icebergMetadataApplier.applySchemaChange(createEvent);
+ writer0.write(createEvent, null);
+ writer1.write(createEvent, null);
+
+ BinaryRecordDataGenerator gen =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Only subtask 0 has data before the schema change.
+ writer0.write(
+ DataChangeEvent.insertEvent(
+ tableId, gen.generate(new Object[] {1L,
BinaryStringData.fromString("a")})),
+ null);
+ // Subtask 1 has no writer for the table yet.
+
+ // Both subtasks receive the same SchemaChangeEvent (broadcast).
+ AddColumnEvent addColumn =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(addColumn);
+ writer0.write(addColumn, null); // has writer → flushes at
batchIndex=0; counter → 1
+ writer1.write(addColumn, null); // no writer → counter must still
advance to 1
+
+ Schema newSchema = SchemaUtils.applySchemaChangeEvent(schema,
addColumn);
+ BinaryRecordDataGenerator newGen =
+ new BinaryRecordDataGenerator(
+ newSchema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ // Subtask 1 writes data after the schema change.
+ writer1.write(
+ DataChangeEvent.insertEvent(
+ tableId,
+ newGen.generate(new Object[] {2L,
BinaryStringData.fromString("b"), null})),
+ null);
+
+ Collection<WriteResultWrapper> results0 = writer0.prepareCommit();
+ Collection<WriteResultWrapper> results1 = writer1.prepareCommit();
+
+ // subtask 0: one batch at batchIndex=0 (pre-schema-change flush)
+ Assertions.assertThat(results0).hasSize(1);
+
Assertions.assertThat(results0.iterator().next().getBatchIndex()).isEqualTo(0);
+
+ // subtask 1: must be at batchIndex=1, not 0 — counter advanced at E1
even without a writer
+ Assertions.assertThat(results1).hasSize(1);
+
Assertions.assertThat(results1.iterator().next().getBatchIndex()).isEqualTo(1);
+ }
+
+ /**
+ * Verifies no duplicates when parallel subtasks share a table and one
subtask has no data
+ * before the schema-change flush while the other has an UPDATE that
produces an
+ * equality-delete.
+ */
+ @Test
+ public void
testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() throws
Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+
+ IcebergWriter writer0 =
+ new IcebergWriter(
+ catalogOptions,
+ 0,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergWriter writer1 =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ Schema initialSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createEvent = new CreateTableEvent(tableId,
initialSchema);
+ icebergMetadataApplier.applySchemaChange(createEvent);
+ writer0.write(createEvent, null);
+ writer1.write(createEvent, null);
+
+ BinaryRecordDataGenerator oldGen =
+ new BinaryRecordDataGenerator(
+ initialSchema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ // Subtask 1 writes the "old" row before the schema change.
+ writer1.write(
+ DataChangeEvent.insertEvent(
+ tableId,
+ oldGen.generate(new Object[] {1L,
BinaryStringData.fromString("old")})),
+ null);
+ // Subtask 0 has no data for the table yet.
+
+ // Schema-change E1 is broadcast to both subtasks.
+ AddColumnEvent addColumn =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(addColumn);
+ writer0.write(addColumn, null); // no writer → batchIndex must still
advance to 1 (fix)
+ writer1.write(addColumn, null); // has writer → flushes "old" at
batchIndex=0; counter → 1
+
+ // Subtask 0 processes the UPDATE after E1, using the new schema.
+ Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema,
addColumn);
+ BinaryRecordDataGenerator newGen =
+ new BinaryRecordDataGenerator(
+ newSchema.getColumnDataTypes().toArray(new
DataType[0]));
+ RecordData before =
+ newGen.generate(new Object[] {1L,
BinaryStringData.fromString("old"), null});
+ RecordData after =
+ newGen.generate(new Object[] {1L,
BinaryStringData.fromString("new"), null});
+ writer0.write(DataChangeEvent.updateEvent(tableId, before, after),
null);
+
+ // Collect and commit all results from both subtasks.
+ List<WriteResultWrapper> allResults = new ArrayList<>();
+ allResults.addAll(writer0.prepareCommit());
+ allResults.addAll(writer1.prepareCommit());
+
+ IcebergCommitter committer = new IcebergCommitter(catalogOptions, new
HashMap<>());
+ committer.commit(
+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
+
+ // Only the updated value must survive; "old" must be deleted by the
equality-delete in
+ // batch 1 (higher sequence number). Without the fix both rows appear.
+ List<String> result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result).containsExactly("1, new, null");
+ }
+
+ /**
+ * Verifies that wrappers from two subtasks sharing the same batchIndex
are merged into exactly
+ * one Iceberg snapshot, not two. This directly tests the committer-side
grouping fix.
+ */
+ @Test
+ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot()
throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put(
+ "warehouse",
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString());
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+
+ IcebergWriter writer0 =
+ new IcebergWriter(
+ catalogOptions,
+ 0,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergWriter writer1 =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createEvent = new CreateTableEvent(tableId, schema);
+ icebergMetadataApplier.applySchemaChange(createEvent);
+ writer0.write(createEvent, null);
+ writer1.write(createEvent, null);
+
+ BinaryRecordDataGenerator gen =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Both subtasks write data with no schema change, so both produce
batchIndex=0.
+ writer0.write(
+ DataChangeEvent.insertEvent(
+ tableId, gen.generate(new Object[] {1L,
BinaryStringData.fromString("a")})),
+ null);
+ writer1.write(
+ DataChangeEvent.insertEvent(
+ tableId, gen.generate(new Object[] {2L,
BinaryStringData.fromString("b")})),
+ null);
+
+ List<WriteResultWrapper> allResults = new ArrayList<>();
+ allResults.addAll(writer0.prepareCommit());
+ allResults.addAll(writer1.prepareCommit());
+
+ // Both wrappers carry batchIndex=0.
+ Assertions.assertThat(allResults).hasSize(2);
+ Assertions.assertThat(
+ allResults.stream()
+ .mapToInt(WriteResultWrapper::getBatchIndex)
+ .distinct()
+ .count())
+ .isEqualTo(1);
+
+ Table table =
+ catalog.loadTable(
+ TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName()));
+ long snapshotsBefore = countSnapshots(table);
+
+ IcebergCommitter committer = new IcebergCommitter(catalogOptions, new
HashMap<>());
+ committer.commit(
+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
+
+ table.refresh();
+ long snapshotsAfter = countSnapshots(table);
+
+ // Two wrappers with the same batchIndex must produce exactly ONE new
snapshot, not two.
+ Assertions.assertThat(snapshotsAfter - snapshotsBefore).isEqualTo(1);
+
+ List<String> result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result).containsExactlyInAnyOrder("1, a", "2,
b");
+ }
+
+ /**
+ * Verifies no duplicates in the most complex parallel scenario: subtask 0
has data only before
+ * SC1, subtask 1 has data only between SC1 and SC2, and both have updates
after SC2. This
+ * exercises all three batchIndex slots across two subtasks simultaneously
and confirms that
+ * equality-deletes in batch 2 correctly suppress stale data from batches
0 and 1.
+ */
+ @Test
+ public void
testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
+ throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put(
+ "warehouse",
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString());
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+ IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
+
+ String jobId = UUID.randomUUID().toString();
+ String operatorId = UUID.randomUUID().toString();
+
+ IcebergWriter writer0 =
+ new IcebergWriter(
+ catalogOptions,
+ 0,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+ IcebergWriter writer1 =
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ Schema schema0 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createEvent = new CreateTableEvent(tableId, schema0);
+ icebergMetadataApplier.applySchemaChange(createEvent);
+ writer0.write(createEvent, null);
+ writer1.write(createEvent, null);
+
+ BinaryRecordDataGenerator gen0 =
+ new BinaryRecordDataGenerator(
+ schema0.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Batch 0: only subtask 0 has data before SC1.
+ writer0.write(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen0.generate(new Object[] {1L,
BinaryStringData.fromString("a")})),
+ null);
+ // Subtask 1 has no data before SC1.
+
+ // SC1 broadcast to both subtasks.
+ AddColumnEvent sc1 =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra1", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(sc1);
+ writer0.write(sc1, null); // has writer → flush batchIndex=0; counter
→ 1
+ writer1.write(sc1, null); // no writer → counter must still advance
to 1
+
+ Schema schema1 = SchemaUtils.applySchemaChangeEvent(schema0, sc1);
+ BinaryRecordDataGenerator gen1 =
+ new BinaryRecordDataGenerator(
+ schema1.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Batch 1: only subtask 1 has data between SC1 and SC2.
+ writer1.write(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen1.generate(new Object[] {2L,
BinaryStringData.fromString("b"), null})),
+ null);
+ // Subtask 0 has no data between SC1 and SC2.
+
+ // SC2 broadcast to both subtasks.
+ AddColumnEvent sc2 =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "extra2", DataTypes.STRING(),
null, null))));
+ icebergMetadataApplier.applySchemaChange(sc2);
+ writer0.write(sc2, null); // no writer → counter must still advance
to 2
+ writer1.write(sc2, null); // has writer → flush batchIndex=1; counter
→ 2
+
+ Schema schema2 = SchemaUtils.applySchemaChangeEvent(schema1, sc2);
+ BinaryRecordDataGenerator gen2 =
+ new BinaryRecordDataGenerator(
+ schema2.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Batch 2: both subtasks update their respective rows after SC2.
+ // Subtask 0 updates id=1 "a" → "c"; subtask 1 updates id=2 "b" → "d".
+ writer0.write(
+ DataChangeEvent.updateEvent(
+ tableId,
+ gen2.generate(
+ new Object[] {1L,
BinaryStringData.fromString("a"), null, null}),
+ gen2.generate(
+ new Object[] {1L,
BinaryStringData.fromString("c"), null, null})),
+ null);
+ writer1.write(
+ DataChangeEvent.updateEvent(
+ tableId,
+ gen2.generate(
+ new Object[] {2L,
BinaryStringData.fromString("b"), null, null}),
+ gen2.generate(
+ new Object[] {2L,
BinaryStringData.fromString("d"), null, null})),
+ null);
+
+ List<WriteResultWrapper> allResults = new ArrayList<>();
+ allResults.addAll(writer0.prepareCommit());
+ allResults.addAll(writer1.prepareCommit());
+
+ // Expect 3 batches: {0: sub0}, {1: sub1}, {2: sub0+sub1}
+ long distinctBatchIndices =
+
allResults.stream().mapToInt(WriteResultWrapper::getBatchIndex).distinct().count();
+ Assertions.assertThat(distinctBatchIndices).isEqualTo(3);
+
+ IcebergCommitter committer = new IcebergCommitter(catalogOptions, new
HashMap<>());
+ committer.commit(
+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
+
+ // Only the final values must survive. Equality-deletes in batch 2
(seq N+2) must suppress
+ // the stale inserts in batch 0 (seq N) and batch 1 (seq N+1).
+ List<String> result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result)
+ .containsExactlyInAnyOrder("1, c, null, null", "2, d, null,
null");
+ }
+
+ private static long countSnapshots(Table table) {
+ long count = 0;
+ for (Snapshot ignored : table.snapshots()) {
+ count++;
+ }
+ return count;
+ }
+
/** Mock CommitRequestImpl. */
public static class MockCommitRequestImpl<CommT> extends
CommitRequestImpl<CommT> {
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
index 8b4d7768d..df5654a82 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
@@ -213,10 +213,25 @@ public class MySqlToIcebergE2eITCase extends
PipelineTestEnvironment {
stat.execute("UPDATE products SET description='Fay' WHERE
id=106;");
stat.execute("UPDATE products SET weight='5.125' WHERE id=107;");
+ // Same row updated twice before the schema-change flush below.
+ stat.execute("UPDATE products SET description='Bob v1' WHERE
id=102;");
+ stat.execute("UPDATE products SET description='Bob v2' WHERE
id=102;");
+
// modify table schema
stat.execute("ALTER TABLE products DROP COLUMN point_c;");
stat.execute("DELETE FROM products WHERE id=101;");
+ // And once more after the flush; the latest value should win, no
duplicate.
+ stat.execute("UPDATE products SET weight='1.125' WHERE id=102;");
+ // Update then delete the same row; it should be gone.
+ stat.execute("UPDATE products SET description='Cecily v2' WHERE
id=103;");
+ stat.execute("DELETE FROM products WHERE id=103;");
+ // Delete then re-insert the same id; the re-inserted row should
survive.
+ stat.execute("DELETE FROM products WHERE id=104;");
+ stat.execute(
+ "INSERT INTO products (id, name, description, weight,
enum_c, json_c) "
+ + "VALUES (104, 'Four', 'Reborn', 9.875, 'white',
null);");
+
stat.execute(
"INSERT INTO products VALUES
(default,'Eleven','Kryo',5.18, null, null);"); // 111
stat.execute(
@@ -229,9 +244,8 @@ public class MySqlToIcebergE2eITCase extends
PipelineTestEnvironment {
List<String> recordsInSnapshotPhase =
new ArrayList<>(
Arrays.asList(
- "102, Two, Bob, 1.703, white, {\"key2\":
\"value2\"}, null, null, null, null, null, null, null, null, null, null",
- "103, Three, Cecily, 4.105, red, {\"key3\":
\"value3\"}, null, null, null, null, null, null, null, null, null, null",
- "104, Four, Derrida, 1.857, white, {\"key4\":
\"value4\"}, null, null, null, null, null, null, null, null, null, null",
+ "102, Two, Bob v2, 1.125, white,
{\"key2\":\"value2\"}, null, null, null, null, null, null, null, null, null,
null",
+ "104, Four, Reborn, 9.875, white, null, null,
null, null, null, null, null, null, null, null, null",
"105, Five, Evelyn, 5.211, red, {\"K\": \"V\",
\"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null",
"106, Six, Fay, 9.813, null, null, null, null,
null, null, null, null, null, null, null, null",
"107, Seven, Grace, 5.125, null, null, null,
null, null, null, null, null, null, null, null, null",