This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 691c3dd437 [INLONG-8366][Sort] Lost data in Iceberg when restoring
checkpoint (#8367)
691c3dd437 is described below
commit 691c3dd4378559c955c5a1002995038ce859f83e
Author: thexia <[email protected]>
AuthorDate: Wed Jul 5 09:51:22 2023 +0800
[INLONG-8366][Sort] Lost data in Iceberg when restoring checkpoint (#8367)
Co-authored-by: thexiay <[email protected]>
---
.../sink/multiple/IcebergSingleFileCommiter.java | 152 ++++++-
.../sort/iceberg/sink/HadoopCatalogResource.java | 93 ++++
.../sort/iceberg/sink/TestRollbackAndRecover.java | 494 +++++++++++++++++++++
.../sort/iceberg/sink/util/SimpleDataUtil.java | 447 +++++++++++++++++++
.../sort/iceberg/sink/util/TestTableLoader.java | 58 +++
.../inlong/sort/iceberg/sink/util/TestTables.java | 332 ++++++++++++++
licenses/inlong-sort-connectors/LICENSE | 4 +-
7 files changed, 1563 insertions(+), 17 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
index 45bd441442..c8dee94171 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -65,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
+import java.util.stream.Collectors;
public class IcebergSingleFileCommiter extends
IcebergProcessFunction<WriteResult, Void>
implements
@@ -73,6 +75,8 @@ public class IcebergSingleFileCommiter extends
IcebergProcessFunction<WriteResul
private static final long serialVersionUID = 1L;
private static final long INITIAL_CHECKPOINT_ID = -1L;
+
+ private static final long INVALID_SNAPSHOT_ID = -1L;
private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
private static final Logger LOG =
LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
@@ -170,25 +174,80 @@ public class IcebergSingleFileCommiter extends
IcebergProcessFunction<WriteResul
this.checkpointsState =
context.getOperatorStateStore().getListState(stateDescriptor);
this.jobIdState =
context.getOperatorStateStore().getListState(jobIdDescriptor);
// New table doesn't have state, so it doesn't need to do restore
operation.
- if (context.isRestored() && jobIdState.get().iterator().hasNext()) {
+ if (context.isRestored()) {
+ if (!jobIdState.get().iterator().hasNext()) {
+ LOG.error("JobId is null, Skip restore process");
+ return;
+ }
String restoredFlinkJobId = jobIdState.get().iterator().next();
+
this.dataFilesPerCheckpoint.putAll(checkpointsState.get().iterator().next());
+ // every datafiles will be added into state, so there must be data
and nullpoint exception will not happen
+ Long restoredCheckpointId =
dataFilesPerCheckpoint.keySet().stream().max(Long::compareTo).get();
Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
"Flink job id parsed from checkpoint snapshot shouldn't be
null or empty");
- // Since flink's checkpoint id will start from the
max-committed-checkpoint-id + 1 in the new flink job even
- // if it's restored from a snapshot created by another different
flink job, so it's safe to assign the max
- // committed checkpoint id from restored flink job to the current
flink job.
- this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table,
restoredFlinkJobId);
-
- NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
- .newTreeMap(checkpointsState.get().iterator().next())
- .tailMap(maxCommittedCheckpointId, false);
- if (!uncommittedDataFiles.isEmpty()) {
- // Committed all uncommitted data files from the old flink job
to iceberg table.
- long maxUncommittedCheckpointId =
uncommittedDataFiles.lastKey();
- commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId,
maxUncommittedCheckpointId);
+ // ------------------------------
+ // ↓ ↑
+ // a --> a+1 --> a+2 --> ... --> a+n
+ // max checkpoint id = m
+ // a >= m: supplementary commit snapshot between checkpoint (`m`,
`a`]
+ // a < m: rollback to snapshot associated with checkpoint `a`
+ rollbackAndRecover(restoredFlinkJobId, restoredCheckpointId);
+ }
+ }
+
+ private void rollback(long snapshotId) {
+ table.manageSnapshots().rollbackTo(snapshotId).commit();
+ }
+
+ private void recover(String restoredFlinkJobId, NavigableMap<Long, byte[]>
uncommittedManifests) throws Exception {
+ if (!uncommittedManifests.isEmpty()) {
+ // Committed all uncommitted data files from the old flink job to
iceberg table.
+ long maxUncommittedCheckpointId = uncommittedManifests.lastKey();
+ commitUpToCheckpoint(uncommittedManifests, restoredFlinkJobId,
maxUncommittedCheckpointId);
+ }
+ }
+
+ private void rollbackAndRecover(String restoredFlinkJobId, Long
restoredCheckpointId) throws Exception {
+ // Since flink's checkpoint id will start from the
max-committed-checkpoint-id + 1 in the new flink job even
+ // if it's restored from a snapshot created by another different flink
job, so it's safe to assign the max
+ // committed checkpoint id from restored flink job to the current
flink job.
+ this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table,
restoredFlinkJobId);
+ // Find snapshot associated with restoredCheckpointId
+ long snapshotId = getSnapshotIdAssociatedWithChkId(table,
restoredFlinkJobId, restoredCheckpointId);
+
+ // Once maxCommitted CheckpointId is greater than
restoredCheckpointId, it means more data added, it need
+ // rollback
+ if (restoredCheckpointId < maxCommittedCheckpointId) {
+ if (snapshotId != INVALID_SNAPSHOT_ID) {
+ LOG.info("Rollback committed snapshot to {}", snapshotId);
+ rollback(snapshotId); // TODO: what if rollback throw Exception
+ } else {
+ long minUncommittedCheckpointId =
dataFilesPerCheckpoint.keySet().stream().min(Long::compareTo).get();
+ if (maxCommittedCheckpointId >= minUncommittedCheckpointId) {
+ LOG.warn("It maybe has some repeat data between chk[{},
{}]", minUncommittedCheckpointId,
+ maxCommittedCheckpointId);
+ }
+
+ // should recover all manifest that has not been deleted. Not
deleted mean it may not be committed.
+ long uncommittedChkId = findEarliestUnCommittedManifest(
+
dataFilesPerCheckpoint.headMap(maxCommittedCheckpointId, true), table.io());
+ LOG.info("Snapshot has been expired. Recover all uncommitted
snapshot between chk[{}, {}]. "
+ + "maxCommittedCheckpointId is {},
minUncommittedCheckpointId is {}.",
+ uncommittedChkId, restoredCheckpointId,
+ maxCommittedCheckpointId, minUncommittedCheckpointId);
+ if (uncommittedChkId != INITIAL_CHECKPOINT_ID) {
+ recover(restoredFlinkJobId,
dataFilesPerCheckpoint.tailMap(uncommittedChkId, false));
+ } else {
+ recover(restoredFlinkJobId, dataFilesPerCheckpoint);
+ }
}
+ } else {
+ LOG.info("Recover uncommitted snapshot between chk({}, {}]. ",
maxCommittedCheckpointId,
+ restoredCheckpointId);
+ recover(restoredFlinkJobId,
dataFilesPerCheckpoint.tailMap(maxCommittedCheckpointId, false));
}
+ dataFilesPerCheckpoint.clear();
}
@Override
@@ -257,6 +316,8 @@ public class IcebergSingleFileCommiter extends
IcebergProcessFunction<WriteResul
}
continuousEmptyCheckpoints = 0;
}
+ // remove already committed snapshot manifest info
+ pendingMap.keySet().forEach(deltaManifestsMap::remove);
pendingMap.clear();
// Delete the committed manifests.
@@ -344,8 +405,9 @@ public class IcebergSingleFileCommiter extends
IcebergProcessFunction<WriteResul
private void commitOperation(SnapshotUpdate<?> operation, int
numDataFiles, int numDeleteFiles, String description,
String newFlinkJobId, long checkpointId) {
- LOG.info("Committing {} with {} data files and {} delete files to
table {}", description, numDataFiles,
- numDeleteFiles, table);
+ LOG.info(
+ "Committing {} with {} data files and {} delete files to table
{} with max committed checkpoint id {}.",
+ description, numDataFiles, numDeleteFiles, table,
checkpointId);
operation.set(MAX_COMMITTED_CHECKPOINT_ID,
Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
@@ -403,7 +465,7 @@ public class IcebergSingleFileCommiter extends
IcebergProcessFunction<WriteResul
String.format("iceberg(%s)-files-committer-state",
tableId.toString()), sortedMapTypeInfo);
}
- static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+ public static long getMaxCommittedCheckpointId(Table table, String
flinkJobId) {
Snapshot snapshot = table.currentSnapshot();
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
@@ -423,4 +485,62 @@ public class IcebergSingleFileCommiter extends
IcebergProcessFunction<WriteResul
return lastCommittedCheckpointId;
}
+
+ static long getSnapshotIdAssociatedWithChkId(Table table, String
flinkJobId, Long checkpointId) {
+ Snapshot snapshot = table.currentSnapshot();
+ long associatedSnapshotId = INVALID_SNAPSHOT_ID;
+
+ while (snapshot != null) {
+ Map<String, String> summary = snapshot.summary();
+ String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
+ if (flinkJobId.equals(snapshotFlinkJobId)) {
+ String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
+ if (value != null &&
checkpointId.equals(Long.parseLong(value))) {
+ associatedSnapshotId = snapshot.snapshotId();
+ break;
+ }
+ }
+ Long parentSnapshotId = snapshot.parentId();
+ snapshot = parentSnapshotId != null ?
table.snapshot(parentSnapshotId) : null;
+ }
+ return associatedSnapshotId;
+ }
+
+ /**
+ * Find last uncommitted manifest files in a list of manifest files.
+ *
+ * Assume one manifest commit, all the previous manifests have been
submitted (compared with the size according to the checkpoint)
+ * Assume flink manifest file has not been deleted, this manifest file
+ *
+ * @param deltaManifestsMap: all manifest files maybe haven not been not
committed
+ * @param io: file access tool
+ * @return
+ * @throws IOException
+ */
+ static long findEarliestUnCommittedManifest(NavigableMap<Long, byte[]>
deltaManifestsMap, FileIO io)
+ throws IOException {
+ List<Long> uncommittedChkList = deltaManifestsMap.keySet()
+ .stream()
+ .sorted(Comparator.reverseOrder())
+ .collect(Collectors.toList());
+ long uncommittedChkId = INITIAL_CHECKPOINT_ID;
+ for (long chkId : uncommittedChkList) {
+ byte[] e = deltaManifestsMap.get(chkId);
+ if (Arrays.equals(EMPTY_MANIFEST_DATA, e)) {
+ // Skip the empty flink manifest.
+ continue;
+ }
+
+ DeltaManifests deltaManifests = SimpleVersionedSerialization
+
.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e);
+ if (deltaManifests.manifests()
+ .stream()
+ .anyMatch(manifest ->
!io.newInputFile(manifest.path()).exists())) {
+ // manifest file not exist means `chkId` is committed
+ uncommittedChkId = chkId;
+ break;
+ }
+ }
+ return uncommittedChkId;
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/HadoopCatalogResource.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/HadoopCatalogResource.java
new file mode 100644
index 0000000000..9503a3c472
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/HadoopCatalogResource.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+public class HadoopCatalogResource extends ExternalResource {
+
+ protected final TemporaryFolder temporaryFolder;
+ protected final String database;
+ protected final String tableName;
+
+ protected Catalog catalog;
+ protected CatalogLoader catalogLoader;
+ protected String warehouse;
+ protected TableLoader tableLoader;
+
+ public HadoopCatalogResource(TemporaryFolder temporaryFolder, String
database, String tableName) {
+ this.temporaryFolder = temporaryFolder;
+ this.database = database;
+ this.tableName = tableName;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ File warehouseFile = temporaryFolder.newFolder();
+ Assert.assertTrue(warehouseFile.delete());
+ // before variables
+ this.warehouse = "file:" + warehouseFile;
+ this.catalogLoader =
+ CatalogLoader.hadoop(
+ "hadoop",
+ new Configuration(),
+ ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION,
warehouse));
+ this.catalog = catalogLoader.loadCatalog();
+ this.tableLoader =
+ TableLoader.fromCatalog(catalogLoader,
TableIdentifier.of(database, tableName));
+ }
+
+ @Override
+ protected void after() {
+ try {
+ catalog.dropTable(TableIdentifier.of(database, tableName));
+ ((HadoopCatalog) catalog).close();
+ tableLoader.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to close catalog resource");
+ }
+ }
+
+ public TableLoader tableLoader() {
+ return tableLoader;
+ }
+
+ public Catalog catalog() {
+ return catalog;
+ }
+
+ public CatalogLoader catalogLoader() {
+ return catalogLoader;
+ }
+
+ public String warehouse() {
+ return warehouse;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/TestRollbackAndRecover.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/TestRollbackAndRecover.java
new file mode 100644
index 0000000000..20b5a0e5db
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/TestRollbackAndRecover.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergProcessOperator;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSingleFileCommiter;
+import org.apache.inlong.sort.iceberg.sink.util.SimpleDataUtil;
+import org.apache.inlong.sort.iceberg.sink.util.TestTableLoader;
+import org.apache.inlong.sort.iceberg.sink.util.TestTables;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ActionsProvider;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static
org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+
+public class TestRollbackAndRecover {
+
+ public static final String DATABASE = "default";
+ public static final String TABLE = "t";
+ private static final org.apache.hadoop.conf.Configuration CONF = new
org.apache.hadoop.conf.Configuration();
+
+ private Table table;
+ private TableLoader tableLoader;
+ private final FileFormat format = FileFormat.fromString("avro");
+ private File flinkManifestFolder;
+ private File metadataDir;
+ private File tableDir;
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final HadoopCatalogResource catalogResource = new
HadoopCatalogResource(TEMPORARY_FOLDER, DATABASE, TABLE);
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Before
+ public void before() throws IOException {
+ flinkManifestFolder = temp.newFolder();
+
+ this.tableDir = temp.newFolder();
+ this.metadataDir = new File(tableDir, "metadata");
+ Assert.assertTrue(tableDir.delete());
+
+ // Construct the iceberg table.
+ table = TestTables.create(tableDir, "test", SimpleDataUtil.SCHEMA,
PartitionSpec.unpartitioned(), 2);
+
+ table
+ .updateProperties()
+ .set(DEFAULT_FILE_FORMAT, format.name())
+ .set(FLINK_MANIFEST_LOCATION,
flinkManifestFolder.getAbsolutePath())
+ .set("flink.max-continuous-empty-commits", "1")
+ .commit();
+ }
+
+ @After
+ public void after() {
+ TestTables.clearTables();
+ }
+
+ // case1: Commit normally, and then submit multiple times. At this time,
reset to a previous chk, \
+ // and the snapshot corresponding to the chk has not expired
+ @Test
+ public void testRollbackToSnapshotWithRestoreCheckpointId() throws
Exception {
+ long timestamp = 0;
+ long checkpoint = 10;
+ OperatorSubtaskState initCheckpoint;
+ JobID jobID = new JobID();
+ ImmutableList firstResult;
+
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> op =
createStreamSink(jobID)) {
+ FileAppenderFactory<RowData> appenderFactory =
createDeletableAppenderFactory();
+
+ op.setup();
+ op.open();
+
+ RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+ RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+ RowData delete3 = SimpleDataUtil.createDelete(3, "ccc");
+ firstResult = ImmutableList.of(insert1, insert2);
+ DataFile dataFile1 = writeDataFile("data-file-1",
ImmutableList.of(insert1, insert2));
+ DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory,
"delete-file-1", ImmutableList.of(delete3));
+ op.processElement(
+
WriteResult.builder().addDataFiles(dataFile1).addDeleteFiles(deleteFile1).build(),
+ ++timestamp);
+
+ // The 1th snapshotState.
+ initCheckpoint = op.snapshot(checkpoint, ++timestamp);
+ op.notifyOfCompletedCheckpoint(checkpoint);
+
+ RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
+ RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
+ DataFile dataFile2 = writeDataFile("data-file-2",
ImmutableList.of(insert4));
+ DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory,
"delete-file-2", ImmutableList.of(delete2));
+ op.processElement(
+
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile2).build(),
+ ++timestamp);
+
+ // The 2nd snapshotState.
+ op.snapshot(++checkpoint, ++timestamp);
+ op.notifyOfCompletedCheckpoint(checkpoint);
+ assertMaxCommittedCheckpointId(jobID, checkpoint);
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1,
insert4));
+ }
+
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> op =
createStreamSink(jobID)) {
+ // init state from begin checkpoint
+ op.setup();
+ op.initializeState(initCheckpoint);
+ op.open();
+
+ // test if rollback success
+ // check snapshot information
+ assertMaxCommittedCheckpointId(jobID, checkpoint - 1);
+ table.refresh();
+ SimpleDataUtil.assertTableRows(table, firstResult);
+ }
+ }
+
+ // case2: Commit normally, and then submit multiple times. At this time,
reset to a previous chk, but the snapshot
+ // corresponding to the chk has been expired
+ @Test
+ public void testFallbackToRecoverWithUnCompletedNotification() throws
Exception {
+ long timestamp = 0;
+ long checkpoint = 10;
+ OperatorSubtaskState initCheckpoint;
+ long initTimestamp;
+ JobID jobID = new JobID();
+ ImmutableList result;
+
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> op =
createStreamSink(jobID)) {
+ FileAppenderFactory<RowData> appenderFactory =
createDeletableAppenderFactory();
+
+ op.setup();
+ op.open();
+
+ RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+ RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+ RowData delete3 = SimpleDataUtil.createDelete(3, "ccc");
+ DataFile dataFile1 = writeDataFile("data-file-1",
ImmutableList.of(insert1, insert2));
+ DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory,
"delete-file-1", ImmutableList.of(delete3));
+ op.processElement(
+
WriteResult.builder().addDataFiles(dataFile1).addDeleteFiles(deleteFile1).build(),
+ ++timestamp);
+
+ // The 1th snapshotState.
+ initCheckpoint = op.snapshot(checkpoint, ++timestamp);
+ op.notifyOfCompletedCheckpoint(checkpoint);
+ initTimestamp = System.currentTimeMillis();
+
+ RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
+ RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
+ DataFile dataFile2 = writeDataFile("data-file-2",
ImmutableList.of(insert4));
+ DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory,
"delete-file-2", ImmutableList.of(delete2));
+ op.processElement(
+
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile2).build(),
+ ++timestamp);
+
+ // The 2nd snapshotState.
+ op.snapshot(++checkpoint, ++timestamp);
+ op.notifyOfCompletedCheckpoint(checkpoint);
+ assertMaxCommittedCheckpointId(jobID, checkpoint);
+ result = ImmutableList.of(insert1, insert4);
+ SimpleDataUtil.assertTableRows(table, result);
+
+ }
+
+ table.expireSnapshots().expireOlderThan(initTimestamp).commit();
+
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> op =
createStreamSink(jobID)) {
+ // init state from begin checkpoint
+ op.setup();
+ op.initializeState(initCheckpoint);
+ op.open();
+
+ // check snapshot information
+ assertMaxCommittedCheckpointId(jobID, checkpoint);
+ SimpleDataUtil.assertTableRows(table, result); // same as before
+ }
+ }
+
+ // case3: Commit normally, and then commit multiple times. At this time,
reset to a previous chk.
+ // There are multiple uncommitted manifests in this chk. These manifests
are really not committed
+ // successfully
+ @Test
+ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws
Exception {
+ long timestamp = 0;
+ long checkpoint = 10;
+ long restoreCheckpoint = 0;
+ OperatorSubtaskState restoreCheckpointState;
+ long initTimestamp = 0;
+ JobID jobID = new JobID();
+ List result;
+
+ // first commmit 3 snapshot
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> op =
createStreamSink(jobID)) {
+ FileAppenderFactory<RowData> appenderFactory =
createDeletableAppenderFactory();
+
+ op.setup();
+ op.open();
+
+ // write 1nd data
+ RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+ RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+ RowData delete3 = SimpleDataUtil.createDelete(3, "ccc");
+ DataFile dataFile1 = writeDataFile("data-file-1",
ImmutableList.of(insert1, insert2));
+ DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory,
"delete-file-1", ImmutableList.of(delete3));
+ op.processElement(
+
WriteResult.builder().addDataFiles(dataFile1).addDeleteFiles(deleteFile1).build(),
+ ++timestamp);
+
+ // The 1th snapshotState.
+ restoreCheckpoint = checkpoint;
+ restoreCheckpointState = op.snapshot(checkpoint, ++timestamp);
+ op.notifyOfCompletedCheckpoint(checkpoint);
+
+ // write 2nd data
+ RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
+ RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
+ DataFile dataFile2 = writeDataFile("data-file-2",
ImmutableList.of(insert4));
+ DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory,
"delete-file-2", ImmutableList.of(delete2));
+ op.processElement(
+
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile2).build(),
+ ++timestamp);
+
+ // The 2nd snapshotState.
+ op.snapshot(++checkpoint, ++timestamp);
+ op.notifyOfCompletedCheckpoint(checkpoint);
+ assertMaxCommittedCheckpointId(jobID, checkpoint);
+ initTimestamp = System.currentTimeMillis();
+
+ // write 3nd data
+ RowData insert5 = SimpleDataUtil.createInsert(5, "eee");
+ RowData insert6 = SimpleDataUtil.createInsert(6, "fff");
+ DataFile dataFile3 = writeDataFile("data-file-3",
ImmutableList.of(insert5, insert6));
+
op.processElement(WriteResult.builder().addDataFiles(dataFile3).build(),
++timestamp);
+
+ // The 3nd snapshotState.
+ op.snapshot(++checkpoint, ++timestamp);
+ op.notifyOfCompletedCheckpoint(checkpoint);
+ assertMaxCommittedCheckpointId(jobID, checkpoint);
+ result = ImmutableList.of(insert1, insert4, insert5, insert6);
+ }
+
+ // only retain 3th snapshot
+ table.expireSnapshots().expireOlderThan(initTimestamp).commit();
+ checkpoint = restoreCheckpoint;
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> op =
createStreamSink(jobID)) {
+ FileAppenderFactory<RowData> appenderFactory =
createDeletableAppenderFactory();
+ // init state from begin checkpoint
+ op.setup();
+ op.initializeState(restoreCheckpointState);
+ op.open();
+
+ // write data
+ RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
+ RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
+ RowData insert5 = SimpleDataUtil.createInsert(5, "eee");
+ RowData insert6 = SimpleDataUtil.createInsert(6, "fff");
+ RowData delete1 = SimpleDataUtil.createDelete(1, "aaa"); // new
delete
+ RowData insert7 = SimpleDataUtil.createInsert(7, "ggg"); // new
insert
+ DataFile dataFile4 = writeDataFile("data-file-4",
ImmutableList.of(insert4, insert5, insert6, insert7));
+ DeleteFile deleteFile4 =
+ writeEqDeleteFile(appenderFactory, "delete-file-4",
ImmutableList.of(delete2, delete1));
+
op.processElement(WriteResult.builder().addDataFiles(dataFile4).addDeleteFiles(deleteFile4).build(),
+ initTimestamp);
+ // snapshotState
+ restoreCheckpointState = op.snapshot(++checkpoint, ++timestamp);
+ op.notifyOfCompletedCheckpoint(checkpoint);
+ SimpleDataUtil.assertTableRows(table, result);
+ result = ImmutableList.of(insert4, insert5, insert6, insert7);
+ }
+
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> op =
createStreamSink(jobID)) {
+ op.setup();
+ op.initializeState(restoreCheckpointState);
+ op.open();
+
+ // check snapshot information
+ assertMaxCommittedCheckpointId(jobID, checkpoint);
+ SimpleDataUtil.assertTableRows(table, result);
+ }
+ }
+
+ public IcebergProcessOperator<WriteResult, Void> buildCommitter(
+ boolean overwritemode,
+ ActionsProvider actionsProvider,
+ ReadableConfig tableOptions) {
+ IcebergSingleFileCommiter commiter = new IcebergSingleFileCommiter(
+ TableIdentifier.of(table.name()),
+ tableLoader,
+ overwritemode,
+ actionsProvider,
+ tableOptions);
+ return new IcebergProcessOperator(commiter);
+ }
+
+ private OneInputStreamOperatorTestHarness<WriteResult, Void>
createStreamSink(JobID jobID)
+ throws Exception {
+ TestOperatorFactory factory = TestOperatorFactory.of(table.location(),
TableIdentifier.of(table.name()));
+ return new OneInputStreamOperatorTestHarness<>(factory,
createEnvironment(jobID));
+ }
+
+ private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
+ int[] equalityFieldIds = new int[]{
+ table.schema().findField("id").fieldId(),
table.schema().findField("data").fieldId()
+ };
+ return new FlinkAppenderFactory(
+ table.schema(),
+ FlinkSchemaUtil.convert(table.schema()),
+ table.properties(),
+ table.spec(),
+ equalityFieldIds,
+ table.schema(),
+ null);
+ }
+
+ private DataFile writeDataFile(String filename, List<RowData> rows) throws
IOException {
+ return SimpleDataUtil.writeFile(
+ table,
+ table.schema(),
+ table.spec(),
+ CONF,
+ table.location(),
+ format.addExtension(filename),
+ rows);
+ }
+
+ private DataFile writeDataFile(
+ String filename, List<RowData> rows, PartitionSpec spec,
StructLike partition)
+ throws IOException {
+ return SimpleDataUtil.writeFile(
+ table,
+ table.schema(),
+ spec,
+ CONF,
+ table.location(),
+ format.addExtension(filename),
+ rows,
+ partition);
+ }
+
+ private DeleteFile writeEqDeleteFile(
+ FileAppenderFactory<RowData> appenderFactory, String filename,
List<RowData> deletes)
+ throws IOException {
+ return SimpleDataUtil.writeEqDeleteFile(table, format, filename,
appenderFactory, deletes);
+ }
+
+ private DeleteFile writePosDeleteFile(
+ FileAppenderFactory<RowData> appenderFactory,
+ String filename,
+ List<Pair<CharSequence, Long>> positions)
+ throws IOException {
+ return SimpleDataUtil.writePosDeleteFile(table, format, filename,
appenderFactory, positions);
+ }
+
+ private void assertSnapshotSize(int expectedSnapshotSize) {
+ table.refresh();
+ Assert.assertEquals(expectedSnapshotSize,
Lists.newArrayList(table.snapshots()).size());
+ }
+
+ private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID
operatorID, long expectedId) {
+ table.refresh();
+ long actualId =
IcebergSingleFileCommiter.getMaxCommittedCheckpointId(table, jobID.toString());
+ Assert.assertEquals(expectedId, actualId);
+ }
+
+ private List<Path> assertFlinkManifests(int expectedCount) throws
IOException {
+ List<Path> manifests = Files.list(flinkManifestFolder.toPath())
+ .filter(p -> !p.toString().endsWith(".crc"))
+ .collect(Collectors.toList());
+ Assert.assertEquals(
+ String.format("Expected %s flink manifests, but the list is:
%s", expectedCount, manifests),
+ expectedCount,
+ manifests.size());
+ return manifests;
+ }
+
+ private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
+ table.refresh();
+ long actualId =
IcebergSingleFileCommiter.getMaxCommittedCheckpointId(table, jobID.toString());
+ Assert.assertEquals(expectedId, actualId);
+ }
+
+ private static MockEnvironment createEnvironment(JobID jobID) {
+ return new MockEnvironmentBuilder()
+ .setTaskName("test task")
+ .setManagedMemorySize(32 * 1024)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(256)
+ .setTaskConfiguration(new
org.apache.flink.configuration.Configuration())
+ .setExecutionConfig(new ExecutionConfig())
+ .setMaxParallelism(16)
+ .setJobID(jobID)
+ .build();
+ }
+
+ private static class TestOperatorFactory extends
AbstractStreamOperatorFactory<Void>
+ implements
+ OneInputStreamOperatorFactory<WriteResult, Void> {
+
+ private final String tablePath;
+ private final TableIdentifier tableIdentifier;
+
+ private TestOperatorFactory(String tablePath, TableIdentifier
tableIdentifier) {
+ this.tablePath = tablePath;
+ this.tableIdentifier = tableIdentifier;
+ }
+
+ private static TestOperatorFactory of(String tablePath,
TableIdentifier tableIdentifier) {
+ return new TestOperatorFactory(tablePath, tableIdentifier);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<Void>> T createStreamOperator(
+ StreamOperatorParameters<Void> param) {
+ IcebergSingleFileCommiter commiter = new IcebergSingleFileCommiter(
+ tableIdentifier,
+ new TestTableLoader(tablePath),
+ false,
+ null,
+ new Configuration());
+ IcebergProcessOperator<?, Void> operator = new
IcebergProcessOperator(commiter);
+ operator.setup(param.getContainingTask(), param.getStreamConfig(),
param.getOutput());
+ return (T) operator;
+ }
+
+ @Override
+ public Class<? extends StreamOperator>
getStreamOperatorClass(ClassLoader classLoader) {
+ return IcebergProcessOperator.class;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/SimpleDataUtil.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/SimpleDataUtil.java
new file mode 100644
index 0000000000..48551225bb
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/SimpleDataUtil.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.util;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.1.x
+ */
+public class SimpleDataUtil {
+
+ private SimpleDataUtil() {
+ }
+
+ public static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.optional(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data",
Types.StringType.get()));
+
+ public static final TableSchema FLINK_SCHEMA =
+ TableSchema.builder().field("id", DataTypes.INT()).field("data",
DataTypes.STRING()).build();
+
+ public static final RowType ROW_TYPE = (RowType)
FLINK_SCHEMA.toRowDataType().getLogicalType();
+
+ public static final Record RECORD = GenericRecord.create(SCHEMA);
+
+ public static Table createTable(
+ String path, Map<String, String> properties, boolean partitioned) {
+ PartitionSpec spec;
+ if (partitioned) {
+ spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ } else {
+ spec = PartitionSpec.unpartitioned();
+ }
+ return new HadoopTables().create(SCHEMA, spec, properties, path);
+ }
+
+ public static Record createRecord(Integer id, String data) {
+ Record record = RECORD.copy();
+ record.setField("id", id);
+ record.setField("data", data);
+ return record;
+ }
+
+ public static RowData createRowData(Integer id, String data) {
+ return GenericRowData.of(id, StringData.fromString(data));
+ }
+
+ public static RowData createInsert(Integer id, String data) {
+ return GenericRowData.ofKind(RowKind.INSERT, id,
StringData.fromString(data));
+ }
+
+ public static RowData createDelete(Integer id, String data) {
+ return GenericRowData.ofKind(RowKind.DELETE, id,
StringData.fromString(data));
+ }
+
+ public static RowData createUpdateBefore(Integer id, String data) {
+ return GenericRowData.ofKind(RowKind.UPDATE_BEFORE, id,
StringData.fromString(data));
+ }
+
+ public static RowData createUpdateAfter(Integer id, String data) {
+ return GenericRowData.ofKind(RowKind.UPDATE_AFTER, id,
StringData.fromString(data));
+ }
+
+ public static DataFile writeFile(
+ Table table,
+ Schema schema,
+ PartitionSpec spec,
+ Configuration conf,
+ String location,
+ String filename,
+ List<RowData> rows)
+ throws IOException {
+ return writeFile(table, schema, spec, conf, location, filename, rows,
null);
+ }
+
+ /** Write the list of {@link RowData} to the given path and with the given
partition data */
+ public static DataFile writeFile(
+ Table table,
+ Schema schema,
+ PartitionSpec spec,
+ Configuration conf,
+ String location,
+ String filename,
+ List<RowData> rows,
+ StructLike partition)
+ throws IOException {
+ Path path = new Path(location, filename);
+ FileFormat fileFormat = FileFormat.fromFileName(filename);
+ Preconditions.checkNotNull(fileFormat, "Cannot determine format for
file: %s", filename);
+
+ RowType flinkSchema = FlinkSchemaUtil.convert(schema);
+ FileAppenderFactory<RowData> appenderFactory =
+ new FlinkAppenderFactory(
+ schema, flinkSchema, ImmutableMap.of(), spec, null,
null, null);
+
+ FileAppender<RowData> appender =
appenderFactory.newAppender(fromPath(path, conf), fileFormat);
+ try (FileAppender<RowData> closeableAppender = appender) {
+ closeableAppender.addAll(rows);
+ }
+
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withInputFile(HadoopInputFile.fromPath(path, conf))
+ .withMetrics(appender.metrics());
+
+ if (partition != null) {
+ builder = builder.withPartition(partition);
+ }
+
+ return builder.build();
+ }
+
+ public static DeleteFile writeEqDeleteFile(
+ Table table,
+ FileFormat format,
+ String filename,
+ FileAppenderFactory<RowData> appenderFactory,
+ List<RowData> deletes)
+ throws IOException {
+ EncryptedOutputFile outputFile =
+ table
+ .encryption()
+ .encrypt(fromPath(new Path(table.location(),
filename), new Configuration()));
+
+ EqualityDeleteWriter<RowData> eqWriter =
+ appenderFactory.newEqDeleteWriter(outputFile, format, null);
+ try (EqualityDeleteWriter<RowData> writer = eqWriter) {
+ writer.write(deletes);
+ }
+ return eqWriter.toDeleteFile();
+ }
+
+ public static DeleteFile writePosDeleteFile(
+ Table table,
+ FileFormat format,
+ String filename,
+ FileAppenderFactory<RowData> appenderFactory,
+ List<Pair<CharSequence, Long>> positions)
+ throws IOException {
+ EncryptedOutputFile outputFile =
+ table
+ .encryption()
+ .encrypt(fromPath(new Path(table.location(),
filename), new Configuration()));
+
+ PositionDeleteWriter<RowData> posWriter =
+ appenderFactory.newPosDeleteWriter(outputFile, format, null);
+ PositionDelete<RowData> posDelete = PositionDelete.create();
+ try (PositionDeleteWriter<RowData> writer = posWriter) {
+ for (Pair<CharSequence, Long> p : positions) {
+ writer.write(posDelete.set(p.first(), p.second(), null));
+ }
+ }
+ return posWriter.toDeleteFile();
+ }
+
+ private static List<Record> convertToRecords(List<RowData> rows) {
+ List<Record> records = Lists.newArrayList();
+ for (RowData row : rows) {
+ Integer id = row.isNullAt(0) ? null : row.getInt(0);
+ String data = row.isNullAt(1) ? null : row.getString(1).toString();
+ records.add(createRecord(id, data));
+ }
+ return records;
+ }
+
+ public static void assertTableRows(String tablePath, List<RowData>
expected, String branch)
+ throws IOException {
+ assertTableRecords(tablePath, convertToRecords(expected), branch);
+ }
+
+ public static void assertTableRows(Table table, List<RowData> expected)
throws IOException {
+ assertTableRecords(table, convertToRecords(expected),
SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRows(Table table, List<RowData> expected,
String branch)
+ throws IOException {
+ assertTableRecords(table, convertToRecords(expected), branch);
+ }
+
+ /** Get all rows for a table */
+ public static List<Record> tableRecords(Table table) throws IOException {
+ table.refresh();
+ List<Record> records = Lists.newArrayList();
+ try (CloseableIterable<Record> iterable =
IcebergGenerics.read(table).build()) {
+ for (Record record : iterable) {
+ records.add(record);
+ }
+ }
+ return records;
+ }
+
+ public static boolean equalsRecords(List<Record> expected, List<Record>
actual, Schema schema) {
+ if (expected.size() != actual.size()) {
+ return false;
+ }
+ Types.StructType type = schema.asStruct();
+ StructLikeSet expectedSet = StructLikeSet.create(type);
+ expectedSet.addAll(expected);
+ StructLikeSet actualSet = StructLikeSet.create(type);
+ actualSet.addAll(actual);
+ return expectedSet.equals(actualSet);
+ }
+
+ public static void assertRecordsEqual(List<Record> expected, List<Record>
actual, Schema schema) {
+ Assert.assertEquals(expected.size(), actual.size());
+ Types.StructType type = schema.asStruct();
+ StructLikeSet expectedSet = StructLikeSet.create(type);
+ expectedSet.addAll(expected);
+ StructLikeSet actualSet = StructLikeSet.create(type);
+ actualSet.addAll(actual);
+ Assert.assertEquals(expectedSet, actualSet);
+ }
+
+ /**
+ * Assert table contains the expected list of records after waiting up to
{@code maxCheckCount}
+ * with {@code checkInterval}
+ */
+ public static void assertTableRecords(
+ Table table, List<Record> expected, Duration checkInterval, int
maxCheckCount)
+ throws IOException, InterruptedException {
+ for (int i = 0; i < maxCheckCount; ++i) {
+ if (equalsRecords(expected, tableRecords(table), table.schema())) {
+ break;
+ } else {
+ Thread.sleep(checkInterval.toMillis());
+ }
+ }
+ // success or failure, assert on the latest table state
+ assertRecordsEqual(expected, tableRecords(table), table.schema());
+ }
+
+ public static void assertTableRecords(Table table, List<Record> expected)
throws IOException {
+ assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRecords(Table table, List<Record> expected,
String branch)
+ throws IOException {
+ table.refresh();
+ Snapshot snapshot = latestSnapshot(table, branch);
+
+ if (snapshot == null) {
+ Assert.assertEquals(expected, ImmutableList.of());
+ return;
+ }
+
+ Types.StructType type = table.schema().asStruct();
+ StructLikeSet expectedSet = StructLikeSet.create(type);
+ expectedSet.addAll(expected);
+
+ try (CloseableIterable<Record> iterable =
+
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
+ StructLikeSet actualSet = StructLikeSet.create(type);
+
+ for (Record record : iterable) {
+ actualSet.add(record);
+ }
+
+ Assert.assertEquals("Should produce the expected record",
expectedSet, actualSet);
+ }
+ }
+
+ // Returns the latest snapshot of the given branch in the table
+ public static Snapshot latestSnapshot(Table table, String branch) {
+ // For the main branch, currentSnapshot() is used to validate that the
API behavior has
+ // not changed since that was the API used for validation prior to
addition of branches.
+ if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ return table.currentSnapshot();
+ }
+
+ return table.snapshot(branch);
+ }
+
+ public static void assertTableRecords(String tablePath, List<Record>
expected)
+ throws IOException {
+ Preconditions.checkArgument(expected != null, "expected records
shouldn't be null");
+ assertTableRecords(new HadoopTables().load(tablePath), expected,
SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRecords(String tablePath, List<Record>
expected, String branch)
+ throws IOException {
+ Preconditions.checkArgument(expected != null, "expected records
shouldn't be null");
+ assertTableRecords(new HadoopTables().load(tablePath), expected,
branch);
+ }
+
+ public static StructLikeSet expectedRowSet(Table table, Record... records)
{
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ Collections.addAll(set, records);
+ return set;
+ }
+
+ public static StructLikeSet actualRowSet(Table table, String... columns)
throws IOException {
+ return actualRowSet(table, null, columns);
+ }
+
+ public static StructLikeSet actualRowSet(Table table, Long snapshotId,
String... columns)
+ throws IOException {
+ table.refresh();
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ try (CloseableIterable<Record> reader =
+ IcebergGenerics.read(table)
+ .useSnapshot(snapshotId == null ?
table.currentSnapshot().snapshotId() : snapshotId)
+ .select(columns)
+ .build()) {
+ reader.forEach(set::add);
+ }
+ return set;
+ }
+
+ public static List<DataFile> partitionDataFiles(Table table, Map<String,
Object> partitionValues)
+ throws IOException {
+ table.refresh();
+ Types.StructType partitionType = table.spec().partitionType();
+
+ Record partitionRecord =
GenericRecord.create(partitionType).copy(partitionValues);
+ StructLikeWrapper expectedWrapper =
+ StructLikeWrapper.forType(partitionType).set(partitionRecord);
+
+ List<DataFile> dataFiles = Lists.newArrayList();
+ try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
+ for (FileScanTask scanTask : fileScanTasks) {
+ StructLikeWrapper wrapper =
+
StructLikeWrapper.forType(partitionType).set(scanTask.file().partition());
+
+ if (expectedWrapper.equals(wrapper)) {
+ dataFiles.add(scanTask.file());
+ }
+ }
+ }
+
+ return dataFiles;
+ }
+
+ public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table)
throws IOException {
+ table.refresh();
+
+ Map<Long, List<DataFile>> result = Maps.newHashMap();
+ Snapshot current = table.currentSnapshot();
+ while (current != null) {
+ TableScan tableScan = table.newScan();
+ if (current.parentId() != null) {
+ // Collect the data files that was added only in current
snapshot.
+ tableScan = tableScan.appendsBetween(current.parentId(),
current.snapshotId());
+ } else {
+ // Collect the data files that was added in the oldest
snapshot.
+ tableScan = tableScan.useSnapshot(current.snapshotId());
+ }
+ try (CloseableIterable<FileScanTask> scanTasks =
tableScan.planFiles()) {
+ result.put(
+ current.snapshotId(),
+ ImmutableList.copyOf(Iterables.transform(scanTasks,
FileScanTask::file)));
+ }
+
+ // Continue to traverse the parent snapshot if exists.
+ if (current.parentId() == null) {
+ break;
+ }
+ // Iterate to the parent snapshot.
+ current = table.snapshot(current.parentId());
+ }
+ return result;
+ }
+
+ public static List<DataFile> matchingPartitions(
+ List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String,
Object> partitionValues) {
+ Types.StructType partitionType = partitionSpec.partitionType();
+ Record partitionRecord =
GenericRecord.create(partitionType).copy(partitionValues);
+ StructLikeWrapper expected =
StructLikeWrapper.forType(partitionType).set(partitionRecord);
+ return dataFiles.stream()
+ .filter(
+ df -> {
+ StructLikeWrapper wrapper =
+
StructLikeWrapper.forType(partitionType).set(df.partition());
+ return wrapper.equals(expected);
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTableLoader.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTableLoader.java
new file mode 100644
index 0000000000..18e123b6b4
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTableLoader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.util;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+
+import java.io.File;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+public class TestTableLoader implements TableLoader {
+
+ private File dir;
+
+ public static TableLoader of(String dir) {
+ return new TestTableLoader(dir);
+ }
+
+ public TestTableLoader(String dir) {
+ this.dir = new File(dir);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public Table loadTable() {
+ return TestTables.load(dir, "test");
+ }
+
+ @Override
+ @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
+ public TableLoader clone() {
+ return new TestTableLoader(dir.getAbsolutePath());
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTables.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTables.java
new file mode 100644
index 0000000000..96a5cbeae0
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTables.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.util;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.LocationProviders;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.Transactions;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.io.File;
+import java.util.Map;
+
+import static org.apache.iceberg.TableMetadata.newTableMetadata;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.1.x
+ */
+public class TestTables {
+
+ private TestTables() {
+ }
+
+ private static TestTable upgrade(File temp, String name, int
newFormatVersion) {
+ TestTable table = load(temp, name);
+ TableOperations ops = table.ops();
+ TableMetadata base = ops.current();
+ ops.commit(base,
ops.current().upgradeToFormatVersion(newFormatVersion));
+ return table;
+ }
+
+ public static TestTable create(
+ File temp, String name, Schema schema, PartitionSpec spec, int
formatVersion) {
+ return create(temp, name, schema, spec, SortOrder.unsorted(),
formatVersion);
+ }
+
+ public static TestTable create(
+ File temp,
+ String name,
+ Schema schema,
+ PartitionSpec spec,
+ SortOrder sortOrder,
+ int formatVersion) {
+ TestTableOperations ops = new TestTableOperations(name, temp);
+ if (ops.current() != null) {
+ throw new AlreadyExistsException("Table %s already exists at
location: %s", name, temp);
+ }
+
+ ops.commit(
+ null,
+ newTableMetadata(
+ schema, spec, sortOrder, temp.toString(),
+ ImmutableMap.of("format-version",
String.valueOf(formatVersion))));
+
+ return new TestTable(ops, name);
+ }
+
+ public static TestTable create(
+ File temp,
+ String name,
+ Schema schema,
+ PartitionSpec spec,
+ SortOrder sortOrder) {
+ TestTableOperations ops = new TestTableOperations(name, temp);
+ if (ops.current() != null) {
+ throw new AlreadyExistsException("Table %s already exists at
location: %s", name, temp);
+ }
+
+ ops.commit(
+ null,
+ newTableMetadata(
+ schema, spec, sortOrder, temp.toString(),
ImmutableMap.of()));
+
+ return new TestTable(ops, name);
+ }
+
+ public static Transaction beginCreate(File temp, String name, Schema
schema, PartitionSpec spec) {
+ return beginCreate(temp, name, schema, spec, SortOrder.unsorted());
+ }
+
+ public static Transaction beginCreate(
+ File temp, String name, Schema schema, PartitionSpec spec,
SortOrder sortOrder) {
+ TableOperations ops = new TestTableOperations(name, temp);
+ if (ops.current() != null) {
+ throw new AlreadyExistsException("Table %s already exists at
location: %s", name, temp);
+ }
+
+ TableMetadata metadata =
+ newTableMetadata(schema, spec, sortOrder, temp.toString(),
ImmutableMap.of());
+
+ return Transactions.createTableTransaction(name, ops, metadata);
+ }
+
+ public static Transaction beginReplace(
+ File temp, String name, Schema schema, PartitionSpec spec) {
+ return beginReplace(
+ temp,
+ name,
+ schema,
+ spec,
+ SortOrder.unsorted(),
+ ImmutableMap.of(),
+ new TestTableOperations(name, temp));
+ }
+
+ public static Transaction beginReplace(
+ File temp,
+ String name,
+ Schema schema,
+ PartitionSpec spec,
+ SortOrder sortOrder,
+ Map<String, String> properties) {
+ return beginReplace(
+ temp, name, schema, spec, sortOrder, properties, new
TestTableOperations(name, temp));
+ }
+
+ public static Transaction beginReplace(
+ File temp,
+ String name,
+ Schema schema,
+ PartitionSpec spec,
+ SortOrder sortOrder,
+ Map<String, String> properties,
+ TestTableOperations ops) {
+ TableMetadata current = ops.current();
+ TableMetadata metadata;
+ if (current != null) {
+ metadata = current.buildReplacement(schema, spec, sortOrder,
current.location(), properties);
+ return Transactions.replaceTableTransaction(name, ops, metadata);
+ } else {
+ metadata = newTableMetadata(schema, spec, sortOrder,
temp.toString(), properties);
+ return Transactions.createTableTransaction(name, ops, metadata);
+ }
+ }
+
+ public static TestTable load(File temp, String name) {
+ TestTableOperations ops = new TestTableOperations(name, temp);
+ return new TestTable(ops, name);
+ }
+
+ public static TestTable tableWithCommitSucceedButStateUnknown(File temp,
String name) {
+ TestTableOperations ops = opsWithCommitSucceedButStateUnknown(temp,
name);
+ return new TestTable(ops, name);
+ }
+
+ public static TestTableOperations opsWithCommitSucceedButStateUnknown(File
temp, String name) {
+ return new TestTableOperations(name, temp) {
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata
updatedMetadata) {
+ super.commit(base, updatedMetadata);
+ throw new CommitStateUnknownException(new
RuntimeException("datacenter on fire"));
+ }
+ };
+ }
+
+ public static class TestTable extends BaseTable {
+
+ private final TestTableOperations ops;
+
+ private TestTable(TestTableOperations ops, String name) {
+ super(ops, name);
+ this.ops = ops;
+ }
+
+ TestTableOperations ops() {
+ return ops;
+ }
+ }
+
+ private static final Map<String, TableMetadata> METADATA =
Maps.newHashMap();
+ private static final Map<String, Integer> VERSIONS = Maps.newHashMap();
+
+ public static void clearTables() {
+ synchronized (METADATA) {
+ METADATA.clear();
+ VERSIONS.clear();
+ }
+ }
+
+ static TableMetadata readMetadata(String tableName) {
+ synchronized (METADATA) {
+ return METADATA.get(tableName);
+ }
+ }
+
+ static Integer metadataVersion(String tableName) {
+ synchronized (METADATA) {
+ return VERSIONS.get(tableName);
+ }
+ }
+
+ public static class TestTableOperations implements TableOperations {
+
+ private final String tableName;
+ private final File metadata;
+ private TableMetadata current = null;
+ private long lastSnapshotId = 0;
+ private int failCommits = 0;
+
+ public TestTableOperations(String tableName, File location) {
+ this.tableName = tableName;
+ this.metadata = new File(location, "metadata");
+ metadata.mkdirs();
+ refresh();
+ if (current != null) {
+ for (Snapshot snap : current.snapshots()) {
+ this.lastSnapshotId = Math.max(lastSnapshotId,
snap.snapshotId());
+ }
+ } else {
+ this.lastSnapshotId = 0;
+ }
+ }
+
+ void failCommits(int numFailures) {
+ this.failCommits = numFailures;
+ }
+
+ @Override
+ public TableMetadata current() {
+ return current;
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ synchronized (METADATA) {
+ this.current = METADATA.get(tableName);
+ }
+ return current;
+ }
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata updatedMetadata) {
+ if (base != current) {
+ throw new CommitFailedException("Cannot commit changes based
on stale metadata");
+ }
+ synchronized (METADATA) {
+ refresh();
+ if (base == current) {
+ if (failCommits > 0) {
+ this.failCommits -= 1;
+ throw new CommitFailedException("Injected failure");
+ }
+ Integer version = VERSIONS.get(tableName);
+ // remove changes from the committed metadata
+ this.current =
TableMetadata.buildFrom(updatedMetadata).discardChanges().build();
+ VERSIONS.put(tableName, version == null ? 0 : version + 1);
+ METADATA.put(tableName, current);
+ } else {
+ throw new CommitFailedException(
+ "Commit failed: table was updated at %d",
current.lastUpdatedMillis());
+ }
+ }
+ }
+
+ @Override
+ public FileIO io() {
+ return new LocalFileIO();
+ }
+
+ @Override
+ public LocationProvider locationProvider() {
+ Preconditions.checkNotNull(
+ current, "Current metadata should not be null when
locationProvider is called");
+ return LocationProviders.locationsFor(current.location(),
current.properties());
+ }
+
+ @Override
+ public String metadataFileLocation(String fileName) {
+ return new File(metadata, fileName).getAbsolutePath();
+ }
+
+ @Override
+ public long newSnapshotId() {
+ long nextSnapshotId = lastSnapshotId + 1;
+ this.lastSnapshotId = nextSnapshotId;
+ return nextSnapshotId;
+ }
+ }
+
+ static class LocalFileIO implements FileIO {
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return Files.localInput(path);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return Files.localOutput(path);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ if (!new File(path).delete()) {
+ throw new RuntimeIOException("Failed to delete file: " + path);
+ }
+ }
+ }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 37b40c8cbf..1e4dfab3e1 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -553,6 +553,9 @@
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/SimpleDataUtil.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTableLoader.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTables.java
Source : iceberg-flink:iceberg-flink-1.13:0.13.2 (Please note that the
software have been modified.)
License : https://github.com/apache/iceberg/LICENSE
@@ -567,7 +570,6 @@
1.3.8
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java
Source : flink-cdc-connectors 2.2.1 (Please note that the software have
been modified.)
License :
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE