This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 691c3dd437 [INLONG-8366][Sort] Lost data in Iceberg when restoring 
checkpoint (#8367)
691c3dd437 is described below

commit 691c3dd4378559c955c5a1002995038ce859f83e
Author: thexia <[email protected]>
AuthorDate: Wed Jul 5 09:51:22 2023 +0800

    [INLONG-8366][Sort] Lost data in Iceberg when restoring checkpoint (#8367)
    
    Co-authored-by: thexiay <[email protected]>
---
 .../sink/multiple/IcebergSingleFileCommiter.java   | 152 ++++++-
 .../sort/iceberg/sink/HadoopCatalogResource.java   |  93 ++++
 .../sort/iceberg/sink/TestRollbackAndRecover.java  | 494 +++++++++++++++++++++
 .../sort/iceberg/sink/util/SimpleDataUtil.java     | 447 +++++++++++++++++++
 .../sort/iceberg/sink/util/TestTableLoader.java    |  58 +++
 .../inlong/sort/iceberg/sink/util/TestTables.java  | 332 ++++++++++++++
 licenses/inlong-sort-connectors/LICENSE            |   4 +-
 7 files changed, 1563 insertions(+), 17 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
index 45bd441442..c8dee94171 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.actions.RewriteDataFiles;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -65,6 +66,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.SortedMap;
+import java.util.stream.Collectors;
 
 public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResult, Void>
         implements
@@ -73,6 +75,8 @@ public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResul
 
     private static final long serialVersionUID = 1L;
     private static final long INITIAL_CHECKPOINT_ID = -1L;
+
+    private static final long INVALID_SNAPSHOT_ID = -1L;
     private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
@@ -170,25 +174,80 @@ public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResul
         this.checkpointsState = 
context.getOperatorStateStore().getListState(stateDescriptor);
         this.jobIdState = 
context.getOperatorStateStore().getListState(jobIdDescriptor);
         // New table doesn't have state, so it doesn't need to do restore 
operation.
-        if (context.isRestored() && jobIdState.get().iterator().hasNext()) {
+        if (context.isRestored()) {
+            if (!jobIdState.get().iterator().hasNext()) {
+                LOG.error("JobId is null, Skip restore process");
+                return;
+            }
             String restoredFlinkJobId = jobIdState.get().iterator().next();
+            
this.dataFilesPerCheckpoint.putAll(checkpointsState.get().iterator().next());
+            // every datafiles will be added into state, so there must be data 
and nullpoint exception will not happen
+            Long restoredCheckpointId = 
dataFilesPerCheckpoint.keySet().stream().max(Long::compareTo).get();
             
Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
                     "Flink job id parsed from checkpoint snapshot shouldn't be 
null or empty");
 
-            // Since flink's checkpoint id will start from the 
max-committed-checkpoint-id + 1 in the new flink job even
-            // if it's restored from a snapshot created by another different 
flink job, so it's safe to assign the max
-            // committed checkpoint id from restored flink job to the current 
flink job.
-            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, 
restoredFlinkJobId);
-
-            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
-                    .newTreeMap(checkpointsState.get().iterator().next())
-                    .tailMap(maxCommittedCheckpointId, false);
-            if (!uncommittedDataFiles.isEmpty()) {
-                // Committed all uncommitted data files from the old flink job 
to iceberg table.
-                long maxUncommittedCheckpointId = 
uncommittedDataFiles.lastKey();
-                commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, 
maxUncommittedCheckpointId);
+            // ------------------------------
+            // ↓ ↑
+            // a --> a+1 --> a+2 --> ... --> a+n
+            // max checkpoint id = m
+            // a >= m: supplementary commit snapshot between checkpoint (`m`, 
`a`]
+            // a < m: rollback to snapshot associated with checkpoint `a`
+            rollbackAndRecover(restoredFlinkJobId, restoredCheckpointId);
+        }
+    }
+
+    private void rollback(long snapshotId) {
+        table.manageSnapshots().rollbackTo(snapshotId).commit();
+    }
+
+    private void recover(String restoredFlinkJobId, NavigableMap<Long, byte[]> 
uncommittedManifests) throws Exception {
+        if (!uncommittedManifests.isEmpty()) {
+            // Committed all uncommitted data files from the old flink job to 
iceberg table.
+            long maxUncommittedCheckpointId = uncommittedManifests.lastKey();
+            commitUpToCheckpoint(uncommittedManifests, restoredFlinkJobId, 
maxUncommittedCheckpointId);
+        }
+    }
+
+    private void rollbackAndRecover(String restoredFlinkJobId, Long 
restoredCheckpointId) throws Exception {
+        // Since flink's checkpoint id will start from the 
max-committed-checkpoint-id + 1 in the new flink job even
+        // if it's restored from a snapshot created by another different flink 
job, so it's safe to assign the max
+        // committed checkpoint id from restored flink job to the current 
flink job.
+        this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, 
restoredFlinkJobId);
+        // Find snapshot associated with restoredCheckpointId
+        long snapshotId = getSnapshotIdAssociatedWithChkId(table, 
restoredFlinkJobId, restoredCheckpointId);
+
+        // Once maxCommitted CheckpointId is greater than 
restoredCheckpointId, it means more data added, it need
+        // rollback
+        if (restoredCheckpointId < maxCommittedCheckpointId) {
+            if (snapshotId != INVALID_SNAPSHOT_ID) {
+                LOG.info("Rollback committed snapshot to {}", snapshotId);
+                rollback(snapshotId); // TODO: what if rollback throw Exception
+            } else {
+                long minUncommittedCheckpointId = 
dataFilesPerCheckpoint.keySet().stream().min(Long::compareTo).get();
+                if (maxCommittedCheckpointId >= minUncommittedCheckpointId) {
+                    LOG.warn("It maybe has some repeat data between chk[{}, 
{}]", minUncommittedCheckpointId,
+                            maxCommittedCheckpointId);
+                }
+
+                // should recover all manifest that has not been deleted. Not 
deleted mean it may not be committed.
+                long uncommittedChkId = findEarliestUnCommittedManifest(
+                        
dataFilesPerCheckpoint.headMap(maxCommittedCheckpointId, true), table.io());
+                LOG.info("Snapshot has been expired. Recover all uncommitted 
snapshot between chk[{}, {}]. "
+                        + "maxCommittedCheckpointId is {}, 
minUncommittedCheckpointId is {}.",
+                        uncommittedChkId, restoredCheckpointId,
+                        maxCommittedCheckpointId, minUncommittedCheckpointId);
+                if (uncommittedChkId != INITIAL_CHECKPOINT_ID) {
+                    recover(restoredFlinkJobId, 
dataFilesPerCheckpoint.tailMap(uncommittedChkId, false));
+                } else {
+                    recover(restoredFlinkJobId, dataFilesPerCheckpoint);
+                }
             }
+        } else {
+            LOG.info("Recover uncommitted snapshot between chk({}, {}]. ", 
maxCommittedCheckpointId,
+                    restoredCheckpointId);
+            recover(restoredFlinkJobId, 
dataFilesPerCheckpoint.tailMap(maxCommittedCheckpointId, false));
         }
+        dataFilesPerCheckpoint.clear();
     }
 
     @Override
@@ -257,6 +316,8 @@ public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResul
             }
             continuousEmptyCheckpoints = 0;
         }
+        // remove already committed snapshot manifest info
+        pendingMap.keySet().forEach(deltaManifestsMap::remove);
         pendingMap.clear();
 
         // Delete the committed manifests.
@@ -344,8 +405,9 @@ public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResul
 
     private void commitOperation(SnapshotUpdate<?> operation, int 
numDataFiles, int numDeleteFiles, String description,
             String newFlinkJobId, long checkpointId) {
-        LOG.info("Committing {} with {} data files and {} delete files to 
table {}", description, numDataFiles,
-                numDeleteFiles, table);
+        LOG.info(
+                "Committing {} with {} data files and {} delete files to table 
{} with max committed checkpoint id {}.",
+                description, numDataFiles, numDeleteFiles, table, 
checkpointId);
         operation.set(MAX_COMMITTED_CHECKPOINT_ID, 
Long.toString(checkpointId));
         operation.set(FLINK_JOB_ID, newFlinkJobId);
 
@@ -403,7 +465,7 @@ public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResul
                 String.format("iceberg(%s)-files-committer-state", 
tableId.toString()), sortedMapTypeInfo);
     }
 
-    static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+    public static long getMaxCommittedCheckpointId(Table table, String 
flinkJobId) {
         Snapshot snapshot = table.currentSnapshot();
         long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
@@ -423,4 +485,62 @@ public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResul
 
         return lastCommittedCheckpointId;
     }
+
+    static long getSnapshotIdAssociatedWithChkId(Table table, String 
flinkJobId, Long checkpointId) {
+        Snapshot snapshot = table.currentSnapshot();
+        long associatedSnapshotId = INVALID_SNAPSHOT_ID;
+
+        while (snapshot != null) {
+            Map<String, String> summary = snapshot.summary();
+            String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
+            if (flinkJobId.equals(snapshotFlinkJobId)) {
+                String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
+                if (value != null && 
checkpointId.equals(Long.parseLong(value))) {
+                    associatedSnapshotId = snapshot.snapshotId();
+                    break;
+                }
+            }
+            Long parentSnapshotId = snapshot.parentId();
+            snapshot = parentSnapshotId != null ? 
table.snapshot(parentSnapshotId) : null;
+        }
+        return associatedSnapshotId;
+    }
+
+    /**
+     * Find  last uncommitted manifest files in a list of manifest files.
+     * 
+     * Assume one manifest commit, all the previous manifests have been 
submitted (compared with the size according to the checkpoint) 
+     * Assume flink manifest file has not been deleted, this manifest file 
+     * 
+     * @param deltaManifestsMap: all manifest files maybe haven not been not 
committed 
+     * @param io: file access tool
+     * @return
+     * @throws IOException
+     */
+    static long findEarliestUnCommittedManifest(NavigableMap<Long, byte[]> 
deltaManifestsMap, FileIO io)
+            throws IOException {
+        List<Long> uncommittedChkList = deltaManifestsMap.keySet()
+                .stream()
+                .sorted(Comparator.reverseOrder())
+                .collect(Collectors.toList());
+        long uncommittedChkId = INITIAL_CHECKPOINT_ID;
+        for (long chkId : uncommittedChkList) {
+            byte[] e = deltaManifestsMap.get(chkId);
+            if (Arrays.equals(EMPTY_MANIFEST_DATA, e)) {
+                // Skip the empty flink manifest.
+                continue;
+            }
+
+            DeltaManifests deltaManifests = SimpleVersionedSerialization
+                    
.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e);
+            if (deltaManifests.manifests()
+                    .stream()
+                    .anyMatch(manifest -> 
!io.newInputFile(manifest.path()).exists())) {
+                // manifest file not exist means `chkId` is committed
+                uncommittedChkId = chkId;
+                break;
+            }
+        }
+        return uncommittedChkId;
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/HadoopCatalogResource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/HadoopCatalogResource.java
new file mode 100644
index 0000000000..9503a3c472
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/HadoopCatalogResource.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+public class HadoopCatalogResource extends ExternalResource {
+
+    protected final TemporaryFolder temporaryFolder;
+    protected final String database;
+    protected final String tableName;
+
+    protected Catalog catalog;
+    protected CatalogLoader catalogLoader;
+    protected String warehouse;
+    protected TableLoader tableLoader;
+
+    public HadoopCatalogResource(TemporaryFolder temporaryFolder, String 
database, String tableName) {
+        this.temporaryFolder = temporaryFolder;
+        this.database = database;
+        this.tableName = tableName;
+    }
+
+    @Override
+    protected void before() throws Throwable {
+        File warehouseFile = temporaryFolder.newFolder();
+        Assert.assertTrue(warehouseFile.delete());
+        // before variables
+        this.warehouse = "file:" + warehouseFile;
+        this.catalogLoader =
+                CatalogLoader.hadoop(
+                        "hadoop",
+                        new Configuration(),
+                        ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, 
warehouse));
+        this.catalog = catalogLoader.loadCatalog();
+        this.tableLoader =
+                TableLoader.fromCatalog(catalogLoader, 
TableIdentifier.of(database, tableName));
+    }
+
+    @Override
+    protected void after() {
+        try {
+            catalog.dropTable(TableIdentifier.of(database, tableName));
+            ((HadoopCatalog) catalog).close();
+            tableLoader.close();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to close catalog resource");
+        }
+    }
+
+    public TableLoader tableLoader() {
+        return tableLoader;
+    }
+
+    public Catalog catalog() {
+        return catalog;
+    }
+
+    public CatalogLoader catalogLoader() {
+        return catalogLoader;
+    }
+
+    public String warehouse() {
+        return warehouse;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/TestRollbackAndRecover.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/TestRollbackAndRecover.java
new file mode 100644
index 0000000000..20b5a0e5db
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/TestRollbackAndRecover.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergProcessOperator;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSingleFileCommiter;
+import org.apache.inlong.sort.iceberg.sink.util.SimpleDataUtil;
+import org.apache.inlong.sort.iceberg.sink.util.TestTableLoader;
+import org.apache.inlong.sort.iceberg.sink.util.TestTables;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ActionsProvider;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static 
org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+
+public class TestRollbackAndRecover {
+
+    public static final String DATABASE = "default";
+    public static final String TABLE = "t";
+    private static final org.apache.hadoop.conf.Configuration CONF = new 
org.apache.hadoop.conf.Configuration();
+
+    private Table table;
+    private TableLoader tableLoader;
+    private final FileFormat format = FileFormat.fromString("avro");
+    private File flinkManifestFolder;
+    private File metadataDir;
+    private File tableDir;
+
+    @ClassRule
+    public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Rule
+    public final HadoopCatalogResource catalogResource = new 
HadoopCatalogResource(TEMPORARY_FOLDER, DATABASE, TABLE);
+
+    @Rule
+    public TemporaryFolder temp = new TemporaryFolder();
+
+    @Before
+    public void before() throws IOException {
+        flinkManifestFolder = temp.newFolder();
+
+        this.tableDir = temp.newFolder();
+        this.metadataDir = new File(tableDir, "metadata");
+        Assert.assertTrue(tableDir.delete());
+
+        // Construct the iceberg table.
+        table = TestTables.create(tableDir, "test", SimpleDataUtil.SCHEMA, 
PartitionSpec.unpartitioned(), 2);
+
+        table
+                .updateProperties()
+                .set(DEFAULT_FILE_FORMAT, format.name())
+                .set(FLINK_MANIFEST_LOCATION, 
flinkManifestFolder.getAbsolutePath())
+                .set("flink.max-continuous-empty-commits", "1")
+                .commit();
+    }
+
+    @After
+    public void after() {
+        TestTables.clearTables();
+    }
+
+    // case1: Commit normally, and then submit multiple times. At this time, 
reset to a previous chk, \
+    // and the snapshot corresponding to the chk has not expired
+    @Test
+    public void testRollbackToSnapshotWithRestoreCheckpointId() throws 
Exception {
+        long timestamp = 0;
+        long checkpoint = 10;
+        OperatorSubtaskState initCheckpoint;
+        JobID jobID = new JobID();
+        ImmutableList firstResult;
+
+        try (OneInputStreamOperatorTestHarness<WriteResult, Void> op = 
createStreamSink(jobID)) {
+            FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+            op.setup();
+            op.open();
+
+            RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+            RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+            RowData delete3 = SimpleDataUtil.createDelete(3, "ccc");
+            firstResult = ImmutableList.of(insert1, insert2);
+            DataFile dataFile1 = writeDataFile("data-file-1", 
ImmutableList.of(insert1, insert2));
+            DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, 
"delete-file-1", ImmutableList.of(delete3));
+            op.processElement(
+                    
WriteResult.builder().addDataFiles(dataFile1).addDeleteFiles(deleteFile1).build(),
+                    ++timestamp);
+
+            // The 1th snapshotState.
+            initCheckpoint = op.snapshot(checkpoint, ++timestamp);
+            op.notifyOfCompletedCheckpoint(checkpoint);
+
+            RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
+            RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
+            DataFile dataFile2 = writeDataFile("data-file-2", 
ImmutableList.of(insert4));
+            DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory, 
"delete-file-2", ImmutableList.of(delete2));
+            op.processElement(
+                    
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile2).build(),
+                    ++timestamp);
+
+            // The 2nd snapshotState.
+            op.snapshot(++checkpoint, ++timestamp);
+            op.notifyOfCompletedCheckpoint(checkpoint);
+            assertMaxCommittedCheckpointId(jobID, checkpoint);
+            SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, 
insert4));
+        }
+
+        try (OneInputStreamOperatorTestHarness<WriteResult, Void> op = 
createStreamSink(jobID)) {
+            // init state from begin checkpoint
+            op.setup();
+            op.initializeState(initCheckpoint);
+            op.open();
+
+            // test if rollback success
+            // check snapshot information
+            assertMaxCommittedCheckpointId(jobID, checkpoint - 1);
+            table.refresh();
+            SimpleDataUtil.assertTableRows(table, firstResult);
+        }
+    }
+
+    // case2: Commit normally, and then submit multiple times. At this time, 
reset to a previous chk, but the snapshot
+    // corresponding to the chk has been expired
+    @Test
+    public void testFallbackToRecoverWithUnCompletedNotification() throws 
Exception {
+        long timestamp = 0;
+        long checkpoint = 10;
+        OperatorSubtaskState initCheckpoint;
+        long initTimestamp;
+        JobID jobID = new JobID();
+        ImmutableList result;
+
+        try (OneInputStreamOperatorTestHarness<WriteResult, Void> op = 
createStreamSink(jobID)) {
+            FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+            op.setup();
+            op.open();
+
+            RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+            RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+            RowData delete3 = SimpleDataUtil.createDelete(3, "ccc");
+            DataFile dataFile1 = writeDataFile("data-file-1", 
ImmutableList.of(insert1, insert2));
+            DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, 
"delete-file-1", ImmutableList.of(delete3));
+            op.processElement(
+                    
WriteResult.builder().addDataFiles(dataFile1).addDeleteFiles(deleteFile1).build(),
+                    ++timestamp);
+
+            // The 1th snapshotState.
+            initCheckpoint = op.snapshot(checkpoint, ++timestamp);
+            op.notifyOfCompletedCheckpoint(checkpoint);
+            initTimestamp = System.currentTimeMillis();
+
+            RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
+            RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
+            DataFile dataFile2 = writeDataFile("data-file-2", 
ImmutableList.of(insert4));
+            DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory, 
"delete-file-2", ImmutableList.of(delete2));
+            op.processElement(
+                    
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile2).build(),
+                    ++timestamp);
+
+            // The 2nd snapshotState.
+            op.snapshot(++checkpoint, ++timestamp);
+            op.notifyOfCompletedCheckpoint(checkpoint);
+            assertMaxCommittedCheckpointId(jobID, checkpoint);
+            result = ImmutableList.of(insert1, insert4);
+            SimpleDataUtil.assertTableRows(table, result);
+
+        }
+
+        table.expireSnapshots().expireOlderThan(initTimestamp).commit();
+
+        try (OneInputStreamOperatorTestHarness<WriteResult, Void> op = 
createStreamSink(jobID)) {
+            // init state from begin checkpoint
+            op.setup();
+            op.initializeState(initCheckpoint);
+            op.open();
+
+            // check snapshot information
+            assertMaxCommittedCheckpointId(jobID, checkpoint);
+            SimpleDataUtil.assertTableRows(table, result); // same as before
+        }
+    }
+
+    // case3: Commit normally, and then commit multiple times. At this time, 
reset to a previous chk.
+    // There are multiple uncommitted manifests in this chk. These manifests 
are really not committed
+    // successfully
+    @Test
+    public void testRecoveryFromSnapshotWithoutCompletedNotification() throws 
Exception {
+        long timestamp = 0;
+        long checkpoint = 10;
+        long restoreCheckpoint = 0;
+        OperatorSubtaskState restoreCheckpointState;
+        long initTimestamp = 0;
+        JobID jobID = new JobID();
+        List result;
+
+        // first commmit 3 snapshot
+        try (OneInputStreamOperatorTestHarness<WriteResult, Void> op = 
createStreamSink(jobID)) {
+            FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+            op.setup();
+            op.open();
+
+            // write 1nd data
+            RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+            RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+            RowData delete3 = SimpleDataUtil.createDelete(3, "ccc");
+            DataFile dataFile1 = writeDataFile("data-file-1", 
ImmutableList.of(insert1, insert2));
+            DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, 
"delete-file-1", ImmutableList.of(delete3));
+            op.processElement(
+                    
WriteResult.builder().addDataFiles(dataFile1).addDeleteFiles(deleteFile1).build(),
+                    ++timestamp);
+
+            // The 1th snapshotState.
+            restoreCheckpoint = checkpoint;
+            restoreCheckpointState = op.snapshot(checkpoint, ++timestamp);
+            op.notifyOfCompletedCheckpoint(checkpoint);
+
+            // write 2nd data
+            RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
+            RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
+            DataFile dataFile2 = writeDataFile("data-file-2", 
ImmutableList.of(insert4));
+            DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory, 
"delete-file-2", ImmutableList.of(delete2));
+            op.processElement(
+                    
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile2).build(),
+                    ++timestamp);
+
+            // The 2nd snapshotState.
+            op.snapshot(++checkpoint, ++timestamp);
+            op.notifyOfCompletedCheckpoint(checkpoint);
+            assertMaxCommittedCheckpointId(jobID, checkpoint);
+            initTimestamp = System.currentTimeMillis();
+
+            // write 3nd data
+            RowData insert5 = SimpleDataUtil.createInsert(5, "eee");
+            RowData insert6 = SimpleDataUtil.createInsert(6, "fff");
+            DataFile dataFile3 = writeDataFile("data-file-3", 
ImmutableList.of(insert5, insert6));
+            
op.processElement(WriteResult.builder().addDataFiles(dataFile3).build(), 
++timestamp);
+
+            // The 3nd snapshotState.
+            op.snapshot(++checkpoint, ++timestamp);
+            op.notifyOfCompletedCheckpoint(checkpoint);
+            assertMaxCommittedCheckpointId(jobID, checkpoint);
+            result = ImmutableList.of(insert1, insert4, insert5, insert6);
+        }
+
+        // only retain 3th snapshot
+        table.expireSnapshots().expireOlderThan(initTimestamp).commit();
+        checkpoint = restoreCheckpoint;
+        try (OneInputStreamOperatorTestHarness<WriteResult, Void> op = 
createStreamSink(jobID)) {
+            FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+            // init state from begin checkpoint
+            op.setup();
+            op.initializeState(restoreCheckpointState);
+            op.open();
+
+            // write data
+            RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
+            RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
+            RowData insert5 = SimpleDataUtil.createInsert(5, "eee");
+            RowData insert6 = SimpleDataUtil.createInsert(6, "fff");
+            RowData delete1 = SimpleDataUtil.createDelete(1, "aaa"); // new 
delete
+            RowData insert7 = SimpleDataUtil.createInsert(7, "ggg"); // new 
insert
+            DataFile dataFile4 = writeDataFile("data-file-4", 
ImmutableList.of(insert4, insert5, insert6, insert7));
+            DeleteFile deleteFile4 =
+                    writeEqDeleteFile(appenderFactory, "delete-file-4", 
ImmutableList.of(delete2, delete1));
+            
op.processElement(WriteResult.builder().addDataFiles(dataFile4).addDeleteFiles(deleteFile4).build(),
+                    initTimestamp);
+            // snapshotState
+            restoreCheckpointState = op.snapshot(++checkpoint, ++timestamp);
+            op.notifyOfCompletedCheckpoint(checkpoint);
+            SimpleDataUtil.assertTableRows(table, result);
+            result = ImmutableList.of(insert4, insert5, insert6, insert7);
+        }
+
+        try (OneInputStreamOperatorTestHarness<WriteResult, Void> op = 
createStreamSink(jobID)) {
+            op.setup();
+            op.initializeState(restoreCheckpointState);
+            op.open();
+
+            // check snapshot information
+            assertMaxCommittedCheckpointId(jobID, checkpoint);
+            SimpleDataUtil.assertTableRows(table, result);
+        }
+    }
+
+    public IcebergProcessOperator<WriteResult, Void> buildCommitter(
+            boolean overwritemode,
+            ActionsProvider actionsProvider,
+            ReadableConfig tableOptions) {
+        IcebergSingleFileCommiter commiter = new IcebergSingleFileCommiter(
+                TableIdentifier.of(table.name()),
+                tableLoader,
+                overwritemode,
+                actionsProvider,
+                tableOptions);
+        return new IcebergProcessOperator(commiter);
+    }
+
+    private OneInputStreamOperatorTestHarness<WriteResult, Void> 
createStreamSink(JobID jobID)
+            throws Exception {
+        TestOperatorFactory factory = TestOperatorFactory.of(table.location(), 
TableIdentifier.of(table.name()));
+        return new OneInputStreamOperatorTestHarness<>(factory, 
createEnvironment(jobID));
+    }
+
+    private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
+        int[] equalityFieldIds = new int[]{
+                table.schema().findField("id").fieldId(), 
table.schema().findField("data").fieldId()
+        };
+        return new FlinkAppenderFactory(
+                table.schema(),
+                FlinkSchemaUtil.convert(table.schema()),
+                table.properties(),
+                table.spec(),
+                equalityFieldIds,
+                table.schema(),
+                null);
+    }
+
+    private DataFile writeDataFile(String filename, List<RowData> rows) throws 
IOException {
+        return SimpleDataUtil.writeFile(
+                table,
+                table.schema(),
+                table.spec(),
+                CONF,
+                table.location(),
+                format.addExtension(filename),
+                rows);
+    }
+
+    private DataFile writeDataFile(
+            String filename, List<RowData> rows, PartitionSpec spec, 
StructLike partition)
+            throws IOException {
+        return SimpleDataUtil.writeFile(
+                table,
+                table.schema(),
+                spec,
+                CONF,
+                table.location(),
+                format.addExtension(filename),
+                rows,
+                partition);
+    }
+
+    private DeleteFile writeEqDeleteFile(
+            FileAppenderFactory<RowData> appenderFactory, String filename, 
List<RowData> deletes)
+            throws IOException {
+        return SimpleDataUtil.writeEqDeleteFile(table, format, filename, 
appenderFactory, deletes);
+    }
+
+    private DeleteFile writePosDeleteFile(
+            FileAppenderFactory<RowData> appenderFactory,
+            String filename,
+            List<Pair<CharSequence, Long>> positions)
+            throws IOException {
+        return SimpleDataUtil.writePosDeleteFile(table, format, filename, 
appenderFactory, positions);
+    }
+
+    private void assertSnapshotSize(int expectedSnapshotSize) {
+        table.refresh();
+        Assert.assertEquals(expectedSnapshotSize, 
Lists.newArrayList(table.snapshots()).size());
+    }
+
+    private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID 
operatorID, long expectedId) {
+        table.refresh();
+        long actualId = 
IcebergSingleFileCommiter.getMaxCommittedCheckpointId(table, jobID.toString());
+        Assert.assertEquals(expectedId, actualId);
+    }
+
+    private List<Path> assertFlinkManifests(int expectedCount) throws 
IOException {
+        List<Path> manifests = Files.list(flinkManifestFolder.toPath())
+                .filter(p -> !p.toString().endsWith(".crc"))
+                .collect(Collectors.toList());
+        Assert.assertEquals(
+                String.format("Expected %s flink manifests, but the list is: 
%s", expectedCount, manifests),
+                expectedCount,
+                manifests.size());
+        return manifests;
+    }
+
+    private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
+        table.refresh();
+        long actualId = 
IcebergSingleFileCommiter.getMaxCommittedCheckpointId(table, jobID.toString());
+        Assert.assertEquals(expectedId, actualId);
+    }
+
+    private static MockEnvironment createEnvironment(JobID jobID) {
+        return new MockEnvironmentBuilder()
+                .setTaskName("test task")
+                .setManagedMemorySize(32 * 1024)
+                .setInputSplitProvider(new MockInputSplitProvider())
+                .setBufferSize(256)
+                .setTaskConfiguration(new 
org.apache.flink.configuration.Configuration())
+                .setExecutionConfig(new ExecutionConfig())
+                .setMaxParallelism(16)
+                .setJobID(jobID)
+                .build();
+    }
+
+    private static class TestOperatorFactory extends 
AbstractStreamOperatorFactory<Void>
+            implements
+                OneInputStreamOperatorFactory<WriteResult, Void> {
+
+        private final String tablePath;
+        private final TableIdentifier tableIdentifier;
+
+        private TestOperatorFactory(String tablePath, TableIdentifier 
tableIdentifier) {
+            this.tablePath = tablePath;
+            this.tableIdentifier = tableIdentifier;
+        }
+
+        private static TestOperatorFactory of(String tablePath, 
TableIdentifier tableIdentifier) {
+            return new TestOperatorFactory(tablePath, tableIdentifier);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T extends StreamOperator<Void>> T createStreamOperator(
+                StreamOperatorParameters<Void> param) {
+            IcebergSingleFileCommiter commiter = new IcebergSingleFileCommiter(
+                    tableIdentifier,
+                    new TestTableLoader(tablePath),
+                    false,
+                    null,
+                    new Configuration());
+            IcebergProcessOperator<?, Void> operator = new 
IcebergProcessOperator(commiter);
+            operator.setup(param.getContainingTask(), param.getStreamConfig(), 
param.getOutput());
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return IcebergProcessOperator.class;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/SimpleDataUtil.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/SimpleDataUtil.java
new file mode 100644
index 0000000000..48551225bb
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/SimpleDataUtil.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.util;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
+
+/**
+ *  Copy from iceberg-flink:iceberg-flink-1.15:1.1.x
+ */
+public class SimpleDataUtil {
+
+    private SimpleDataUtil() {
+    }
+
+    public static final Schema SCHEMA =
+            new Schema(
+                    Types.NestedField.optional(1, "id", 
Types.IntegerType.get()),
+                    Types.NestedField.optional(2, "data", 
Types.StringType.get()));
+
+    public static final TableSchema FLINK_SCHEMA =
+            TableSchema.builder().field("id", DataTypes.INT()).field("data", 
DataTypes.STRING()).build();
+
+    public static final RowType ROW_TYPE = (RowType) 
FLINK_SCHEMA.toRowDataType().getLogicalType();
+
+    public static final Record RECORD = GenericRecord.create(SCHEMA);
+
+    public static Table createTable(
+            String path, Map<String, String> properties, boolean partitioned) {
+        PartitionSpec spec;
+        if (partitioned) {
+            spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+        } else {
+            spec = PartitionSpec.unpartitioned();
+        }
+        return new HadoopTables().create(SCHEMA, spec, properties, path);
+    }
+
+    public static Record createRecord(Integer id, String data) {
+        Record record = RECORD.copy();
+        record.setField("id", id);
+        record.setField("data", data);
+        return record;
+    }
+
+    public static RowData createRowData(Integer id, String data) {
+        return GenericRowData.of(id, StringData.fromString(data));
+    }
+
+    public static RowData createInsert(Integer id, String data) {
+        return GenericRowData.ofKind(RowKind.INSERT, id, 
StringData.fromString(data));
+    }
+
+    public static RowData createDelete(Integer id, String data) {
+        return GenericRowData.ofKind(RowKind.DELETE, id, 
StringData.fromString(data));
+    }
+
+    public static RowData createUpdateBefore(Integer id, String data) {
+        return GenericRowData.ofKind(RowKind.UPDATE_BEFORE, id, 
StringData.fromString(data));
+    }
+
+    public static RowData createUpdateAfter(Integer id, String data) {
+        return GenericRowData.ofKind(RowKind.UPDATE_AFTER, id, 
StringData.fromString(data));
+    }
+
+    public static DataFile writeFile(
+            Table table,
+            Schema schema,
+            PartitionSpec spec,
+            Configuration conf,
+            String location,
+            String filename,
+            List<RowData> rows)
+            throws IOException {
+        return writeFile(table, schema, spec, conf, location, filename, rows, 
null);
+    }
+
+    /** Write the list of {@link RowData} to the given path and with the given 
partition data */
+    public static DataFile writeFile(
+            Table table,
+            Schema schema,
+            PartitionSpec spec,
+            Configuration conf,
+            String location,
+            String filename,
+            List<RowData> rows,
+            StructLike partition)
+            throws IOException {
+        Path path = new Path(location, filename);
+        FileFormat fileFormat = FileFormat.fromFileName(filename);
+        Preconditions.checkNotNull(fileFormat, "Cannot determine format for 
file: %s", filename);
+
+        RowType flinkSchema = FlinkSchemaUtil.convert(schema);
+        FileAppenderFactory<RowData> appenderFactory =
+                new FlinkAppenderFactory(
+                        schema, flinkSchema, ImmutableMap.of(), spec, null, 
null, null);
+
+        FileAppender<RowData> appender = 
appenderFactory.newAppender(fromPath(path, conf), fileFormat);
+        try (FileAppender<RowData> closeableAppender = appender) {
+            closeableAppender.addAll(rows);
+        }
+
+        DataFiles.Builder builder =
+                DataFiles.builder(spec)
+                        .withInputFile(HadoopInputFile.fromPath(path, conf))
+                        .withMetrics(appender.metrics());
+
+        if (partition != null) {
+            builder = builder.withPartition(partition);
+        }
+
+        return builder.build();
+    }
+
+    public static DeleteFile writeEqDeleteFile(
+            Table table,
+            FileFormat format,
+            String filename,
+            FileAppenderFactory<RowData> appenderFactory,
+            List<RowData> deletes)
+            throws IOException {
+        EncryptedOutputFile outputFile =
+                table
+                        .encryption()
+                        .encrypt(fromPath(new Path(table.location(), 
filename), new Configuration()));
+
+        EqualityDeleteWriter<RowData> eqWriter =
+                appenderFactory.newEqDeleteWriter(outputFile, format, null);
+        try (EqualityDeleteWriter<RowData> writer = eqWriter) {
+            writer.write(deletes);
+        }
+        return eqWriter.toDeleteFile();
+    }
+
+    public static DeleteFile writePosDeleteFile(
+            Table table,
+            FileFormat format,
+            String filename,
+            FileAppenderFactory<RowData> appenderFactory,
+            List<Pair<CharSequence, Long>> positions)
+            throws IOException {
+        EncryptedOutputFile outputFile =
+                table
+                        .encryption()
+                        .encrypt(fromPath(new Path(table.location(), 
filename), new Configuration()));
+
+        PositionDeleteWriter<RowData> posWriter =
+                appenderFactory.newPosDeleteWriter(outputFile, format, null);
+        PositionDelete<RowData> posDelete = PositionDelete.create();
+        try (PositionDeleteWriter<RowData> writer = posWriter) {
+            for (Pair<CharSequence, Long> p : positions) {
+                writer.write(posDelete.set(p.first(), p.second(), null));
+            }
+        }
+        return posWriter.toDeleteFile();
+    }
+
+    private static List<Record> convertToRecords(List<RowData> rows) {
+        List<Record> records = Lists.newArrayList();
+        for (RowData row : rows) {
+            Integer id = row.isNullAt(0) ? null : row.getInt(0);
+            String data = row.isNullAt(1) ? null : row.getString(1).toString();
+            records.add(createRecord(id, data));
+        }
+        return records;
+    }
+
+    public static void assertTableRows(String tablePath, List<RowData> 
expected, String branch)
+            throws IOException {
+        assertTableRecords(tablePath, convertToRecords(expected), branch);
+    }
+
+    public static void assertTableRows(Table table, List<RowData> expected) 
throws IOException {
+        assertTableRecords(table, convertToRecords(expected), 
SnapshotRef.MAIN_BRANCH);
+    }
+
+    public static void assertTableRows(Table table, List<RowData> expected, 
String branch)
+            throws IOException {
+        assertTableRecords(table, convertToRecords(expected), branch);
+    }
+
+    /** Get all rows for a table */
+    public static List<Record> tableRecords(Table table) throws IOException {
+        table.refresh();
+        List<Record> records = Lists.newArrayList();
+        try (CloseableIterable<Record> iterable = 
IcebergGenerics.read(table).build()) {
+            for (Record record : iterable) {
+                records.add(record);
+            }
+        }
+        return records;
+    }
+
+    public static boolean equalsRecords(List<Record> expected, List<Record> 
actual, Schema schema) {
+        if (expected.size() != actual.size()) {
+            return false;
+        }
+        Types.StructType type = schema.asStruct();
+        StructLikeSet expectedSet = StructLikeSet.create(type);
+        expectedSet.addAll(expected);
+        StructLikeSet actualSet = StructLikeSet.create(type);
+        actualSet.addAll(actual);
+        return expectedSet.equals(actualSet);
+    }
+
+    public static void assertRecordsEqual(List<Record> expected, List<Record> 
actual, Schema schema) {
+        Assert.assertEquals(expected.size(), actual.size());
+        Types.StructType type = schema.asStruct();
+        StructLikeSet expectedSet = StructLikeSet.create(type);
+        expectedSet.addAll(expected);
+        StructLikeSet actualSet = StructLikeSet.create(type);
+        actualSet.addAll(actual);
+        Assert.assertEquals(expectedSet, actualSet);
+    }
+
+    /**
+     * Assert table contains the expected list of records after waiting up to 
{@code maxCheckCount}
+     * with {@code checkInterval}
+     */
+    public static void assertTableRecords(
+            Table table, List<Record> expected, Duration checkInterval, int 
maxCheckCount)
+            throws IOException, InterruptedException {
+        for (int i = 0; i < maxCheckCount; ++i) {
+            if (equalsRecords(expected, tableRecords(table), table.schema())) {
+                break;
+            } else {
+                Thread.sleep(checkInterval.toMillis());
+            }
+        }
+        // success or failure, assert on the latest table state
+        assertRecordsEqual(expected, tableRecords(table), table.schema());
+    }
+
+    public static void assertTableRecords(Table table, List<Record> expected) 
throws IOException {
+        assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
+    }
+
+    public static void assertTableRecords(Table table, List<Record> expected, 
String branch)
+            throws IOException {
+        table.refresh();
+        Snapshot snapshot = latestSnapshot(table, branch);
+
+        if (snapshot == null) {
+            Assert.assertEquals(expected, ImmutableList.of());
+            return;
+        }
+
+        Types.StructType type = table.schema().asStruct();
+        StructLikeSet expectedSet = StructLikeSet.create(type);
+        expectedSet.addAll(expected);
+
+        try (CloseableIterable<Record> iterable =
+                
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
+            StructLikeSet actualSet = StructLikeSet.create(type);
+
+            for (Record record : iterable) {
+                actualSet.add(record);
+            }
+
+            Assert.assertEquals("Should produce the expected record", 
expectedSet, actualSet);
+        }
+    }
+
+    // Returns the latest snapshot of the given branch in the table
+    public static Snapshot latestSnapshot(Table table, String branch) {
+        // For the main branch, currentSnapshot() is used to validate that the 
API behavior has
+        // not changed since that was the API used for validation prior to 
addition of branches.
+        if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+            return table.currentSnapshot();
+        }
+
+        return table.snapshot(branch);
+    }
+
+    public static void assertTableRecords(String tablePath, List<Record> 
expected)
+            throws IOException {
+        Preconditions.checkArgument(expected != null, "expected records 
shouldn't be null");
+        assertTableRecords(new HadoopTables().load(tablePath), expected, 
SnapshotRef.MAIN_BRANCH);
+    }
+
+    public static void assertTableRecords(String tablePath, List<Record> 
expected, String branch)
+            throws IOException {
+        Preconditions.checkArgument(expected != null, "expected records 
shouldn't be null");
+        assertTableRecords(new HadoopTables().load(tablePath), expected, 
branch);
+    }
+
+    public static StructLikeSet expectedRowSet(Table table, Record... records) 
{
+        StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+        Collections.addAll(set, records);
+        return set;
+    }
+
+    public static StructLikeSet actualRowSet(Table table, String... columns) 
throws IOException {
+        return actualRowSet(table, null, columns);
+    }
+
+    public static StructLikeSet actualRowSet(Table table, Long snapshotId, 
String... columns)
+            throws IOException {
+        table.refresh();
+        StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+        try (CloseableIterable<Record> reader =
+                IcebergGenerics.read(table)
+                        .useSnapshot(snapshotId == null ? 
table.currentSnapshot().snapshotId() : snapshotId)
+                        .select(columns)
+                        .build()) {
+            reader.forEach(set::add);
+        }
+        return set;
+    }
+
+    public static List<DataFile> partitionDataFiles(Table table, Map<String, 
Object> partitionValues)
+            throws IOException {
+        table.refresh();
+        Types.StructType partitionType = table.spec().partitionType();
+
+        Record partitionRecord = 
GenericRecord.create(partitionType).copy(partitionValues);
+        StructLikeWrapper expectedWrapper =
+                StructLikeWrapper.forType(partitionType).set(partitionRecord);
+
+        List<DataFile> dataFiles = Lists.newArrayList();
+        try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
+            for (FileScanTask scanTask : fileScanTasks) {
+                StructLikeWrapper wrapper =
+                        
StructLikeWrapper.forType(partitionType).set(scanTask.file().partition());
+
+                if (expectedWrapper.equals(wrapper)) {
+                    dataFiles.add(scanTask.file());
+                }
+            }
+        }
+
+        return dataFiles;
+    }
+
+    public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) 
throws IOException {
+        table.refresh();
+
+        Map<Long, List<DataFile>> result = Maps.newHashMap();
+        Snapshot current = table.currentSnapshot();
+        while (current != null) {
+            TableScan tableScan = table.newScan();
+            if (current.parentId() != null) {
+                // Collect the data files that was added only in current 
snapshot.
+                tableScan = tableScan.appendsBetween(current.parentId(), 
current.snapshotId());
+            } else {
+                // Collect the data files that was added in the oldest 
snapshot.
+                tableScan = tableScan.useSnapshot(current.snapshotId());
+            }
+            try (CloseableIterable<FileScanTask> scanTasks = 
tableScan.planFiles()) {
+                result.put(
+                        current.snapshotId(),
+                        ImmutableList.copyOf(Iterables.transform(scanTasks, 
FileScanTask::file)));
+            }
+
+            // Continue to traverse the parent snapshot if exists.
+            if (current.parentId() == null) {
+                break;
+            }
+            // Iterate to the parent snapshot.
+            current = table.snapshot(current.parentId());
+        }
+        return result;
+    }
+
+    public static List<DataFile> matchingPartitions(
+            List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, 
Object> partitionValues) {
+        Types.StructType partitionType = partitionSpec.partitionType();
+        Record partitionRecord = 
GenericRecord.create(partitionType).copy(partitionValues);
+        StructLikeWrapper expected = 
StructLikeWrapper.forType(partitionType).set(partitionRecord);
+        return dataFiles.stream()
+                .filter(
+                        df -> {
+                            StructLikeWrapper wrapper =
+                                    
StructLikeWrapper.forType(partitionType).set(df.partition());
+                            return wrapper.equals(expected);
+                        })
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTableLoader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTableLoader.java
new file mode 100644
index 0000000000..18e123b6b4
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTableLoader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.util;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+
+import java.io.File;
+
+/**
+ *  Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+public class TestTableLoader implements TableLoader {
+
+    private File dir;
+
+    public static TableLoader of(String dir) {
+        return new TestTableLoader(dir);
+    }
+
+    public TestTableLoader(String dir) {
+        this.dir = new File(dir);
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public Table loadTable() {
+        return TestTables.load(dir, "test");
+    }
+
+    @Override
+    @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
+    public TableLoader clone() {
+        return new TestTableLoader(dir.getAbsolutePath());
+    }
+
+    @Override
+    public void close() {
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTables.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTables.java
new file mode 100644
index 0000000000..96a5cbeae0
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTables.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.util;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.LocationProviders;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.Transactions;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.io.File;
+import java.util.Map;
+
+import static org.apache.iceberg.TableMetadata.newTableMetadata;
+
+/**
+ *  Copy from iceberg-flink:iceberg-flink-1.15:1.1.x
+ */
+public class TestTables {
+
+    private TestTables() {
+    }
+
+    private static TestTable upgrade(File temp, String name, int 
newFormatVersion) {
+        TestTable table = load(temp, name);
+        TableOperations ops = table.ops();
+        TableMetadata base = ops.current();
+        ops.commit(base, 
ops.current().upgradeToFormatVersion(newFormatVersion));
+        return table;
+    }
+
+    public static TestTable create(
+            File temp, String name, Schema schema, PartitionSpec spec, int 
formatVersion) {
+        return create(temp, name, schema, spec, SortOrder.unsorted(), 
formatVersion);
+    }
+
+    public static TestTable create(
+            File temp,
+            String name,
+            Schema schema,
+            PartitionSpec spec,
+            SortOrder sortOrder,
+            int formatVersion) {
+        TestTableOperations ops = new TestTableOperations(name, temp);
+        if (ops.current() != null) {
+            throw new AlreadyExistsException("Table %s already exists at 
location: %s", name, temp);
+        }
+
+        ops.commit(
+                null,
+                newTableMetadata(
+                        schema, spec, sortOrder, temp.toString(),
+                        ImmutableMap.of("format-version", 
String.valueOf(formatVersion))));
+
+        return new TestTable(ops, name);
+    }
+
+    public static TestTable create(
+            File temp,
+            String name,
+            Schema schema,
+            PartitionSpec spec,
+            SortOrder sortOrder) {
+        TestTableOperations ops = new TestTableOperations(name, temp);
+        if (ops.current() != null) {
+            throw new AlreadyExistsException("Table %s already exists at 
location: %s", name, temp);
+        }
+
+        ops.commit(
+                null,
+                newTableMetadata(
+                        schema, spec, sortOrder, temp.toString(), 
ImmutableMap.of()));
+
+        return new TestTable(ops, name);
+    }
+
+    public static Transaction beginCreate(File temp, String name, Schema 
schema, PartitionSpec spec) {
+        return beginCreate(temp, name, schema, spec, SortOrder.unsorted());
+    }
+
+    public static Transaction beginCreate(
+            File temp, String name, Schema schema, PartitionSpec spec, 
SortOrder sortOrder) {
+        TableOperations ops = new TestTableOperations(name, temp);
+        if (ops.current() != null) {
+            throw new AlreadyExistsException("Table %s already exists at 
location: %s", name, temp);
+        }
+
+        TableMetadata metadata =
+                newTableMetadata(schema, spec, sortOrder, temp.toString(), 
ImmutableMap.of());
+
+        return Transactions.createTableTransaction(name, ops, metadata);
+    }
+
+    public static Transaction beginReplace(
+            File temp, String name, Schema schema, PartitionSpec spec) {
+        return beginReplace(
+                temp,
+                name,
+                schema,
+                spec,
+                SortOrder.unsorted(),
+                ImmutableMap.of(),
+                new TestTableOperations(name, temp));
+    }
+
+    public static Transaction beginReplace(
+            File temp,
+            String name,
+            Schema schema,
+            PartitionSpec spec,
+            SortOrder sortOrder,
+            Map<String, String> properties) {
+        return beginReplace(
+                temp, name, schema, spec, sortOrder, properties, new 
TestTableOperations(name, temp));
+    }
+
+    public static Transaction beginReplace(
+            File temp,
+            String name,
+            Schema schema,
+            PartitionSpec spec,
+            SortOrder sortOrder,
+            Map<String, String> properties,
+            TestTableOperations ops) {
+        TableMetadata current = ops.current();
+        TableMetadata metadata;
+        if (current != null) {
+            metadata = current.buildReplacement(schema, spec, sortOrder, 
current.location(), properties);
+            return Transactions.replaceTableTransaction(name, ops, metadata);
+        } else {
+            metadata = newTableMetadata(schema, spec, sortOrder, 
temp.toString(), properties);
+            return Transactions.createTableTransaction(name, ops, metadata);
+        }
+    }
+
+    public static TestTable load(File temp, String name) {
+        TestTableOperations ops = new TestTableOperations(name, temp);
+        return new TestTable(ops, name);
+    }
+
+    public static TestTable tableWithCommitSucceedButStateUnknown(File temp, 
String name) {
+        TestTableOperations ops = opsWithCommitSucceedButStateUnknown(temp, 
name);
+        return new TestTable(ops, name);
+    }
+
+    public static TestTableOperations opsWithCommitSucceedButStateUnknown(File 
temp, String name) {
+        return new TestTableOperations(name, temp) {
+
+            @Override
+            public void commit(TableMetadata base, TableMetadata 
updatedMetadata) {
+                super.commit(base, updatedMetadata);
+                throw new CommitStateUnknownException(new 
RuntimeException("datacenter on fire"));
+            }
+        };
+    }
+
+    public static class TestTable extends BaseTable {
+
+        private final TestTableOperations ops;
+
+        private TestTable(TestTableOperations ops, String name) {
+            super(ops, name);
+            this.ops = ops;
+        }
+
+        TestTableOperations ops() {
+            return ops;
+        }
+    }
+
+    private static final Map<String, TableMetadata> METADATA = 
Maps.newHashMap();
+    private static final Map<String, Integer> VERSIONS = Maps.newHashMap();
+
+    public static void clearTables() {
+        synchronized (METADATA) {
+            METADATA.clear();
+            VERSIONS.clear();
+        }
+    }
+
+    static TableMetadata readMetadata(String tableName) {
+        synchronized (METADATA) {
+            return METADATA.get(tableName);
+        }
+    }
+
+    static Integer metadataVersion(String tableName) {
+        synchronized (METADATA) {
+            return VERSIONS.get(tableName);
+        }
+    }
+
+    public static class TestTableOperations implements TableOperations {
+
+        private final String tableName;
+        private final File metadata;
+        private TableMetadata current = null;
+        private long lastSnapshotId = 0;
+        private int failCommits = 0;
+
+        public TestTableOperations(String tableName, File location) {
+            this.tableName = tableName;
+            this.metadata = new File(location, "metadata");
+            metadata.mkdirs();
+            refresh();
+            if (current != null) {
+                for (Snapshot snap : current.snapshots()) {
+                    this.lastSnapshotId = Math.max(lastSnapshotId, 
snap.snapshotId());
+                }
+            } else {
+                this.lastSnapshotId = 0;
+            }
+        }
+
+        void failCommits(int numFailures) {
+            this.failCommits = numFailures;
+        }
+
+        @Override
+        public TableMetadata current() {
+            return current;
+        }
+
+        @Override
+        public TableMetadata refresh() {
+            synchronized (METADATA) {
+                this.current = METADATA.get(tableName);
+            }
+            return current;
+        }
+
+        @Override
+        public void commit(TableMetadata base, TableMetadata updatedMetadata) {
+            if (base != current) {
+                throw new CommitFailedException("Cannot commit changes based 
on stale metadata");
+            }
+            synchronized (METADATA) {
+                refresh();
+                if (base == current) {
+                    if (failCommits > 0) {
+                        this.failCommits -= 1;
+                        throw new CommitFailedException("Injected failure");
+                    }
+                    Integer version = VERSIONS.get(tableName);
+                    // remove changes from the committed metadata
+                    this.current = 
TableMetadata.buildFrom(updatedMetadata).discardChanges().build();
+                    VERSIONS.put(tableName, version == null ? 0 : version + 1);
+                    METADATA.put(tableName, current);
+                } else {
+                    throw new CommitFailedException(
+                            "Commit failed: table was updated at %d", 
current.lastUpdatedMillis());
+                }
+            }
+        }
+
+        @Override
+        public FileIO io() {
+            return new LocalFileIO();
+        }
+
+        @Override
+        public LocationProvider locationProvider() {
+            Preconditions.checkNotNull(
+                    current, "Current metadata should not be null when 
locationProvider is called");
+            return LocationProviders.locationsFor(current.location(), 
current.properties());
+        }
+
+        @Override
+        public String metadataFileLocation(String fileName) {
+            return new File(metadata, fileName).getAbsolutePath();
+        }
+
+        @Override
+        public long newSnapshotId() {
+            long nextSnapshotId = lastSnapshotId + 1;
+            this.lastSnapshotId = nextSnapshotId;
+            return nextSnapshotId;
+        }
+    }
+
+    static class LocalFileIO implements FileIO {
+
+        @Override
+        public InputFile newInputFile(String path) {
+            return Files.localInput(path);
+        }
+
+        @Override
+        public OutputFile newOutputFile(String path) {
+            return Files.localOutput(path);
+        }
+
+        @Override
+        public void deleteFile(String path) {
+            if (!new File(path).delete()) {
+                throw new RuntimeIOException("Failed to delete file: " + path);
+            }
+        }
+    }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 37b40c8cbf..1e4dfab3e1 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -553,6 +553,9 @@
        
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
        
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
        
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java
+       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/SimpleDataUtil.java
+       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTableLoader.java
+       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/util/TestTables.java
   Source  : iceberg-flink:iceberg-flink-1.13:0.13.2 (Please note that the 
software have been modified.)
   License : https://github.com/apache/iceberg/LICENSE
 
@@ -567,7 +570,6 @@
  1.3.8 
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
        
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
        
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java
-       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java
   Source  : flink-cdc-connectors 2.2.1 (Please note that the software have 
been modified.)
   License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 

Reply via email to