This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d2461d29a [INLONG-8140][Sort] Support data inference schema change 
type and iceberg ddl change (#8141)
7d2461d29a is described below

commit 7d2461d29aa2ede19cef01f0adfaaae3bfa8d48a
Author: LinChen <[email protected]>
AuthorDate: Mon Jun 5 16:24:44 2023 +0800

    [INLONG-8140][Sort] Support data inference schema change type and iceberg 
ddl change (#8141)
---
 .../apache/inlong/sort/schema/ColumnSchema.java    |  34 +++
 .../apache/inlong/sort/schema}/TableChange.java    |   2 +-
 .../apache/inlong/sort/util/SchemaChangeUtils.java | 102 +++++++-
 .../sort/base/schema/SchemaChangeHandle.java       |  82 +++++++
 .../sort/base/schema/SchemaChangeHelper.java       | 271 +++++++++++++++++++++
 .../inlong/sort/base/sink/MultipleSinkOption.java  |   2 +
 .../sort-connectors/iceberg/pom.xml                |  22 ++
 .../inlong/sort/iceberg/IcebergTableSink.java      |   6 +
 .../iceberg/schema/IcebergSchemaChangeHelper.java  | 143 +++++++++++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  15 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java |  88 ++++---
 .../sink/multiple/IcebergSchemaChangeUtils.java    | 140 +++++++++++
 .../iceberg/sink/multiple/SchemaChangeUtils.java   | 198 ---------------
 ...tils.java => TestIcebergSchemaChangeUtils.java} |  22 +-
 14 files changed, 880 insertions(+), 247 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/ColumnSchema.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/ColumnSchema.java
new file mode 100644
index 0000000000..94ecf33c80
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/ColumnSchema.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.schema;
+
+import lombok.Data;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Schema information contained in a column.
+ * */
+@Data
+public class ColumnSchema {
+
+    private String name;
+    private LogicalType type;
+    private boolean isNullable;
+    private String comment;
+    private TableChange.ColumnPosition position;
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/TableChange.java
similarity index 99%
rename from 
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
rename to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/TableChange.java
index 35219763bf..24c1777fb8 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/TableChange.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.base.sink;
+package org.apache.inlong.sort.schema;
 
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Preconditions;
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
index 60970d0f8a..c6ffac7da0 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
@@ -23,25 +23,31 @@ import 
org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
 import org.apache.inlong.sort.protocol.ddl.operations.Operation;
 import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
 import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.schema.ColumnSchema;
+import org.apache.inlong.sort.schema.TableChange;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.StringJoiner;
+import java.util.stream.Collectors;
 
 /**
  * Schema-change Utils
  */
-public final class SchemaChangeUtils {
+public class SchemaChangeUtils {
 
     private final static String DELIMITER = "&";
     private final static String KEY_VALUE_DELIMITER = "=";
 
-    private SchemaChangeUtils() {
+    public SchemaChangeUtils() {
     }
 
     /**
@@ -226,4 +232,96 @@ public final class SchemaChangeUtils {
         }
         return !column.getName().trim().isEmpty();
     }
+
+    /**
+     * Compare two schemas and get the schema changes that happened in them.
+     * TODO: currently only support add column,delete column and column type 
change, rename column and column position change are not supported
+     *
+     * @param oldColumnSchemas
+     * @param newColumnSchemas
+     * @return
+     */
+    public static List<TableChange> diffSchema(Map<String, ColumnSchema> 
oldColumnSchemas,
+            Map<String, ColumnSchema> newColumnSchemas) {
+        List<String> oldFields = oldColumnSchemas.values().stream()
+                .map(ColumnSchema::getName).collect(Collectors.toList());
+        List<String> newFields = newColumnSchemas.values().stream()
+                .map(ColumnSchema::getName).collect(Collectors.toList());
+        Set<String> oldFieldSet = new HashSet<>(oldFields);
+        Set<String> newFieldSet = new HashSet<>(newFields);
+
+        Set<String> intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+        Set<String> colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+        Set<String> colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
+        List<TableChange> tableChanges = new ArrayList<>();
+
+        // step0: judge whether unknown change
+        // 1.just diff two different schema can not distinguish(add + delete) 
vs modify
+        // Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
+        // change.
+        // In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
+        if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
+            tableChanges.add(new TableChange.UnknownColumnChange(
+                    String.format(" Old ColumnSchema: [%s] and new 
ColumnSchema: [%s], it is unknown column change",
+                            oldColumnSchemas, newColumnSchemas)));
+            return tableChanges;
+        }
+
+        // 2.if some filed positions in new schema are not same with old 
schema, there is no way to deal with it.
+        // This situation only is regarded as unknown column change
+        if (colsToDelete.isEmpty() && colsToAdd.isEmpty() && 
oldFieldSet.equals(newFieldSet)
+                && !oldFields.equals(newFields)) {
+            tableChanges.add(
+                    new TableChange.UnknownColumnChange(
+                            String.format(
+                                    " Old ColumnSchema: [%s] and new 
ColumnSchema: [%s], "
+                                            + " they are same but some filed 
positions are not same."
+                                            + " This situation only is 
regarded as unknown column change at present",
+                                    oldColumnSchemas, newColumnSchemas)));
+            return tableChanges;
+        }
+
+        // step1: judge whether column type change
+        for (String colName : intersectColSet) {
+            ColumnSchema oldCol = oldColumnSchemas.get(colName);
+            ColumnSchema newCol = newColumnSchemas.get(colName);
+            if (!oldCol.getType().equals(newCol.getType())
+                    || !oldCol.getComment().equals(newCol.getComment())) {
+                tableChanges.add(
+                        new TableChange.UpdateColumn(
+                                new String[]{newCol.getName()},
+                                newCol.getType(),
+                                newCol.isNullable(),
+                                newCol.getComment()));
+            }
+        }
+
+        // step2: judge whether delete column
+        for (String colName : oldFields) {
+            if (colsToDelete.contains(colName)) {
+                tableChanges.add(
+                        new TableChange.DeleteColumn(
+                                new String[]{colName}));
+            }
+        }
+
+        // step3: judge whether add column
+        if (!colsToAdd.isEmpty()) {
+            for (int i = 0; i < newFields.size(); i++) {
+                String colName = newFields.get(i);
+                if (colsToAdd.contains(colName)) {
+                    ColumnSchema addCol = newColumnSchemas.get(colName);
+                    tableChanges.add(
+                            new TableChange.AddColumn(
+                                    new String[]{addCol.getName()},
+                                    addCol.getType(),
+                                    addCol.isNullable(),
+                                    addCol.getComment(),
+                                    addCol.getPosition()));
+                }
+            }
+        }
+        return tableChanges;
+    }
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandle.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandle.java
new file mode 100644
index 0000000000..cb65f3af93
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandle.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.schema;
+
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Schema change helper interface
+ * */
+public interface SchemaChangeHandle {
+
+    /**
+     * execute ddl statement
+     * */
+    void process(byte[] originData, JsonNode data);
+    /**
+     * Handle modify column operations
+     * */
+    void handleAlterOperation(String database, String table, byte[] 
originData, String originSchema,
+            JsonNode data, AlterOperation operation);
+    /**
+     * apply the modify column operation
+     * */
+    void doAlterOperation(String database, String table, byte[] originData, 
String originSchema, JsonNode data,
+            Map<SchemaChangeType, List<AlterColumn>> typeMap);
+    /**
+     * apply the add column operation
+     * */
+    void doAddColumn(SchemaChangeType type, String originSchema);
+    /**
+     * apply the operation of changing the column type
+     * */
+    void doChangeColumnType(SchemaChangeType type, String originSchema);
+    /**
+     * apply the change column name operation
+     * */
+    void doRenameColumn(SchemaChangeType type, String originSchema);
+    /**
+     * apply the delete column operation
+     * */
+    void doDropColumn(SchemaChangeType type, String originSchema);
+    /**
+     * handle the create table operation
+     * */
+    void doCreateTable(byte[] originData, String database, String table, 
SchemaChangeType type,
+            String originSchema, JsonNode data, CreateTableOperation 
operation);
+    /**
+     * handle drop table operation
+     * */
+    void doDropTable(SchemaChangeType type, String originSchema);
+    /**
+     * handle rename table operation
+     * */
+    void doRenameTable(SchemaChangeType type, String originSchema);
+    /**
+     * handle truncate table operation
+     * */
+    void doTruncateTable(SchemaChangeType type, String originSchema);
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
new file mode 100644
index 0000000000..07ef0f1456
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.schema;
+
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Schema change helper
+ * */
+public abstract class SchemaChangeHelper implements SchemaChangeHandle {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+    private final boolean schemaChange;
+    protected final Map<SchemaChangeType, SchemaChangePolicy> policyMap;
+    protected final JsonDynamicSchemaFormat dynamicSchemaFormat;
+    private final String databasePattern;
+    private final String tablePattern;
+    protected final SchemaUpdateExceptionPolicy exceptionPolicy;
+    private final SinkTableMetricData metricData;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
+
+    public SchemaChangeHelper(JsonDynamicSchemaFormat dynamicSchemaFormat, 
boolean schemaChange,
+            Map<SchemaChangeType, SchemaChangePolicy> policyMap, String 
databasePattern, String tablePattern,
+            SchemaUpdateExceptionPolicy exceptionPolicy,
+            SinkTableMetricData metricData, DirtySinkHelper<Object> 
dirtySinkHelper) {
+        this.dynamicSchemaFormat = 
Preconditions.checkNotNull(dynamicSchemaFormat, "dynamicSchemaFormat is null");
+        this.schemaChange = schemaChange;
+        this.policyMap = policyMap;
+        this.databasePattern = databasePattern;
+        this.tablePattern = tablePattern;
+        this.exceptionPolicy = exceptionPolicy;
+        this.metricData = metricData;
+        this.dirtySinkHelper = dirtySinkHelper;
+    }
+
+    public void process(byte[] originData, JsonNode data) {
+        // 1. Extract the schema change type from the data;
+        if (!schemaChange) {
+            return;
+        }
+        String database;
+        String table;
+        try {
+            database = dynamicSchemaFormat.parse(data, databasePattern);
+            table = dynamicSchemaFormat.parse(data, tablePattern);
+        } catch (Exception e) {
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                throw new SchemaChangeHandleException(
+                        String.format("Parse database, table from origin data 
failed, origin data: %s",
+                                new String(originData)),
+                        e);
+            }
+            LOGGER.warn("Parse database, table from origin data failed, origin 
data: {}", new String(originData), e);
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+                dirtySinkHelper.invoke(new String(originData), 
DirtyType.JSON_PROCESS_ERROR, e);
+            }
+            if (metricData != null) {
+                metricData.invokeDirty(1, originData.length);
+            }
+            return;
+        }
+        Operation operation;
+        try {
+            JsonNode operationNode = 
Preconditions.checkNotNull(data.get("operation"),
+                    "Operation node is null");
+            operation = Preconditions.checkNotNull(
+                    
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new 
TypeReference<Operation>() {
+                    }), "Operation is null");
+        } catch (Exception e) {
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                throw new SchemaChangeHandleException(
+                        String.format("Extract Operation from origin data 
failed,origin data: %s", data), e);
+            }
+            LOGGER.warn("Extract Operation from origin data failed,origin 
data: {}", data, e);
+            handleDirtyData(data, originData, database, table, 
DirtyType.JSON_PROCESS_ERROR, e);
+            return;
+        }
+        String originSchema = dynamicSchemaFormat.extractDDL(data);
+        SchemaChangeType type = 
SchemaChangeUtils.extractSchemaChangeType(operation);
+        if (type == null) {
+            LOGGER.warn("Unsupported for schema-change: {}", originSchema);
+            return;
+        }
+
+        // 2. Apply schema change;
+        SchemaChangePolicy policy = policyMap.get(type);
+        if (policy != SchemaChangePolicy.ENABLE) {
+            doSchemaChangeBase(type, policy, originSchema);
+        } else {
+            switch (type) {
+                case ALTER:
+                    handleAlterOperation(database, table, originData, 
originSchema, data, (AlterOperation) operation);
+                    break;
+                case CREATE_TABLE:
+                    doCreateTable(originData, database, table, type, 
originSchema, data,
+                            (CreateTableOperation) operation);
+                    break;
+                case DROP_TABLE:
+                    doDropTable(type, originSchema);
+                    break;
+                case RENAME_TABLE:
+                    doRenameTable(type, originSchema);
+                    break;
+                case TRUNCATE_TABLE:
+                    doTruncateTable(type, originSchema);
+                    break;
+                default:
+                    LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+            }
+        }
+    }
+
+    @Override
+    public void handleAlterOperation(String database, String table, byte[] 
originData,
+            String originSchema, JsonNode data, AlterOperation operation) {
+        if (operation.getAlterColumns() == null || 
operation.getAlterColumns().isEmpty()) {
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                throw new SchemaChangeHandleException(
+                        String.format("Alter columns is empty, origin schema: 
%s", originSchema));
+            }
+            LOGGER.warn("Alter columns is empty, origin schema: {}", 
originSchema);
+            return;
+        }
+
+        Map<SchemaChangeType, List<AlterColumn>> typeMap = new 
LinkedHashMap<>();
+        for (AlterColumn alterColumn : operation.getAlterColumns()) {
+            Set<SchemaChangeType> types = null;
+            try {
+                types = SchemaChangeUtils.extractSchemaChangeType(alterColumn);
+                Preconditions.checkState(!types.isEmpty(), "Schema change 
types is empty");
+            } catch (Exception e) {
+                if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                    throw new SchemaChangeHandleException(
+                            String.format("Extract schema change type failed, 
origin schema: %s", originSchema), e);
+                }
+                LOGGER.warn("Extract schema change type failed, origin schema: 
{}", originSchema, e);
+            }
+            if (types == null) {
+                continue;
+            }
+            if (types.size() == 1) {
+                SchemaChangeType type = types.stream().findFirst().get();
+                typeMap.computeIfAbsent(type, k -> new 
ArrayList<>()).add(alterColumn);
+            } else {
+                // Handle change column, it only exists change column type and 
rename column in this scenario for now.
+                for (SchemaChangeType type : types) {
+                    LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+                }
+            }
+        }
+
+        if (!typeMap.isEmpty()) {
+            doAlterOperation(database, table, originData, originSchema, data, 
typeMap);
+        }
+    }
+
+    @Override
+    public void doAddColumn(SchemaChangeType type, String originSchema) {
+        throw new SchemaChangeHandleException(String.format("Unsupported for 
%s: %s", type, originSchema));
+    }
+
+    @Override
+    public void doChangeColumnType(SchemaChangeType type, String originSchema) 
{
+        throw new SchemaChangeHandleException(String.format("Unsupported for 
%s: %s", type, originSchema));
+    }
+
+    @Override
+    public void doRenameColumn(SchemaChangeType type, String originSchema) {
+        throw new SchemaChangeHandleException(String.format("Unsupported for 
%s: %s", type, originSchema));
+    }
+
+    @Override
+    public void doDropColumn(SchemaChangeType type, String originSchema) {
+        throw new SchemaChangeHandleException(String.format("Unsupported for 
%s: %s", type, originSchema));
+    }
+
+    @Override
+    public void doCreateTable(byte[] originData, String database, String 
table, SchemaChangeType type,
+            String originSchema, JsonNode data, CreateTableOperation 
operation) {
+        throw new SchemaChangeHandleException(String.format("Unsupported for 
%s: %s", type, originSchema));
+    }
+
+    @Override
+    public void doDropTable(SchemaChangeType type, String originSchema) {
+        throw new SchemaChangeHandleException(String.format("Unsupported for 
%s: %s", type, originSchema));
+    }
+
+    @Override
+    public void doRenameTable(SchemaChangeType type, String originSchema) {
+        throw new SchemaChangeHandleException(String.format("Unsupported for 
%s: %s", type, originSchema));
+    }
+
+    @Override
+    public void doTruncateTable(SchemaChangeType type, String originSchema) {
+        throw new SchemaChangeHandleException(String.format("Unsupported for 
%s: %s", type, originSchema));
+    }
+
+    protected void handleDirtyData(JsonNode data, byte[] originData, String 
database,
+            String table, DirtyType dirtyType, Throwable e) {
+        if (exceptionPolicy == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+            String label = parseValue(data, 
dirtySinkHelper.getDirtyOptions().getLabels());
+            String logTag = parseValue(data, 
dirtySinkHelper.getDirtyOptions().getLogTag());
+            String identifier = parseValue(data, 
dirtySinkHelper.getDirtyOptions().getIdentifier());
+            dirtySinkHelper.invoke(new String(originData), dirtyType, label, 
logTag, identifier, e);
+        }
+        if (metricData != null) {
+            metricData.outputDirtyMetricsWithEstimate(database, table, 1, 
originData.length);
+        }
+    }
+    private String parseValue(JsonNode data, String pattern) {
+        try {
+            return dynamicSchemaFormat.parse(data, pattern);
+        } catch (Exception e) {
+            LOGGER.warn("Parse value from pattern failed,the pattern: {}, 
data: {}", pattern, data);
+        }
+        return pattern;
+    }
+
+    protected void doSchemaChangeBase(SchemaChangeType type, 
SchemaChangePolicy policy, String schema) {
+        if (policy == null) {
+            LOGGER.warn("Unsupported for {}: {}", type, schema);
+            return;
+        }
+        switch (policy) {
+            case LOG:
+                LOGGER.warn("Unsupported for {}: {}", type, schema);
+                break;
+            case ERROR:
+                throw new 
SchemaChangeHandleException(String.format("Unsupported for %s: %s", type, 
schema));
+            default:
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 211acc0cee..d9086c3cab 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.base.sink;
 
+import org.apache.inlong.sort.schema.TableChange;
+
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/pom.xml
index b725b4f020..ef60618492 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/pom.xml
@@ -91,6 +91,12 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -139,6 +145,22 @@
                                     <pattern>com.amazonaws</pattern>
                                     
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.com.amazonaws</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.configuration</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.configuration</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.protocol</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.protocol</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.schema</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.schema</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.util</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.util</shadedPattern>
+                                </relocation>
                             </relocations>
                         </configuration>
                     </execution>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 1e2e28e40b..22e8d39ba4 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -57,6 +57,8 @@ import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERA
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
+import static org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_ENABLE;
+import static 
org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_POLICIES;
 import static 
org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_DISTRIBUTION_MODE;
 import static 
org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_PARALLELISM;
 
@@ -121,6 +123,8 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
 
         final ReadableConfig tableOptions = 
Configuration.fromMap(catalogTable.getOptions());
         boolean multipleSink = tableOptions.get(SINK_MULTIPLE_ENABLE);
+        boolean schemaChange = tableOptions.get(SINK_SCHEMA_CHANGE_ENABLE);
+        String schemaChangePolicies = 
tableOptions.getOptional(SINK_SCHEMA_CHANGE_POLICIES).orElse(null);
         if (multipleSink) {
             return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
                     .overwrite(overwrite)
@@ -142,6 +146,8 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
                     .action(actionsProvider)
                     .tableOptions(tableOptions)
                     
.distributionMode(DistributionMode.fromName(tableOptions.get(WRITE_DISTRIBUTION_MODE)))
+                    .enableSchemaChange(schemaChange)
+                    .schemaChangePolicies(schemaChangePolicies)
                     .append();
         } else {
             return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
new file mode 100644
index 0000000000..b3abc456c3
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.schema;
+
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.schema.SchemaChangeHandleException;
+import org.apache.inlong.sort.base.schema.SchemaChangeHelper;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSchemaChangeUtils;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.schema.TableChange;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Iceberg schema change helper
+ * */
+public class IcebergSchemaChangeHelper extends SchemaChangeHelper {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergSchemaChangeHelper.class);
+    private transient Catalog catalog;
+
+    private transient SupportsNamespaces asNamespaceCatalog;
+
+    public IcebergSchemaChangeHelper(JsonDynamicSchemaFormat 
dynamicSchemaFormat, boolean schemaChange,
+            Map<SchemaChangeType, SchemaChangePolicy> policyMap, String 
databasePattern, String tablePattern,
+            SchemaUpdateExceptionPolicy exceptionPolicy,
+            SinkTableMetricData metricData, DirtySinkHelper<Object> 
dirtySinkHelper,
+            Catalog catalog,
+            SupportsNamespaces asNamespaceCatalog) {
+        super(dynamicSchemaFormat, schemaChange, policyMap, databasePattern,
+                tablePattern, exceptionPolicy, metricData, dirtySinkHelper);
+        this.catalog = catalog;
+        this.asNamespaceCatalog = asNamespaceCatalog;
+    }
+    @Override
+    public void doAlterOperation(String database, String table, byte[] 
originData, String originSchema, JsonNode data,
+            Map<SchemaChangeType, List<AlterColumn>> typeMap) {
+        for (Map.Entry<SchemaChangeType, List<AlterColumn>> kv : 
typeMap.entrySet()) {
+            try {
+                switch (kv.getKey()) {
+                    case ADD_COLUMN:
+                        doAddColumn(kv.getValue(), 
TableIdentifier.of(database, table));
+                        break;
+                    case DROP_COLUMN:
+                        doDropColumn(kv.getKey(), originSchema);
+                        break;
+                    case RENAME_COLUMN:
+                        doRenameColumn(kv.getKey(), originSchema);
+                        break;
+                    case CHANGE_COLUMN_TYPE:
+                        doChangeColumnType(kv.getKey(), originSchema);
+                        break;
+                    default:
+                }
+            } catch (Exception e) {
+                if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                    throw new SchemaChangeHandleException(
+                            String.format("Apply alter column failed, origin 
schema: %s", originSchema), e);
+                }
+                LOGGER.warn("Apply alter column failed, origin schema: {}", 
originSchema, e);
+                handleDirtyData(data, originData, database, table, 
DirtyType.HANDLE_ALTER_TABLE_ERROR, e);
+            }
+        }
+    }
+
+    @Override
+    public void doCreateTable(byte[] originData, String database, String 
table, SchemaChangeType type,
+            String originSchema, JsonNode data, CreateTableOperation 
operation) {
+        try {
+            TableIdentifier tableId = TableIdentifier.of(database, table);
+            List<String> pkListStr = 
dynamicSchemaFormat.extractPrimaryKeyNames(data);
+            RowType rowType = dynamicSchemaFormat.extractSchema(data, 
pkListStr);
+            Schema schema = 
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(rowType));
+            IcebergSchemaChangeUtils.createTable(catalog, tableId, 
asNamespaceCatalog, schema);
+            return;
+        } catch (Exception e) {
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                throw new SchemaChangeHandleException(
+                        String.format("Drop column failed, origin schema: %s", 
originSchema), e);
+            }
+            handleDirtyData(data, originData, database, table, 
DirtyType.CREATE_TABLE_ERROR, e);
+            return;
+        }
+    }
+
+    public void doAddColumn(List<AlterColumn> alterColumns, TableIdentifier 
tableId) {
+        List<TableChange> tableChanges = new ArrayList<>();
+        Table table = catalog.loadTable(tableId);
+        Transaction transaction = table.newTransaction();
+
+        alterColumns.forEach(alterColumn -> {
+            Column column = alterColumn.getNewColumn();
+            LogicalType dataType = 
dynamicSchemaFormat.sqlType2FlinkType(column.getJdbcType());
+            TableChange.ColumnPosition position =
+                    column.getPosition().getPositionType() == 
PositionType.FIRST ? TableChange.ColumnPosition.first()
+                            : 
TableChange.ColumnPosition.after(column.getName());
+            TableChange.AddColumn addColumn = new TableChange.AddColumn(new 
String[]{column.getName()},
+                    dataType, column.isNullable(), column.getComment(), 
position);
+            tableChanges.add(addColumn);
+        });
+        
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
+        transaction.commitTransaction();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 5cb0993b04..f36de37786 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -182,6 +182,8 @@ public class FlinkSink {
         private DirtyOptions dirtyOptions;
         private @Nullable DirtySink<Object> dirtySink;
         private ReadableConfig tableOptions = new Configuration();
+        private boolean enableSchemaChange;
+        private String schemaChangePolicies;
 
         private Builder() {
         }
@@ -266,6 +268,16 @@ public class FlinkSink {
             return this;
         }
 
+        public Builder enableSchemaChange(boolean schemaChange) {
+            this.enableSchemaChange = schemaChange;
+            return this;
+        }
+
+        public Builder schemaChangePolicies(String schemaChangePolicies) {
+            this.schemaChangePolicies = schemaChangePolicies;
+            return this;
+        }
+
         /**
          * The appendMode properties is used to insert data without equality 
field columns.
          *
@@ -672,7 +684,8 @@ public class FlinkSink {
 
             int parallelism = writeParallelism == null ? 
input.getParallelism() : writeParallelism;
             DynamicSchemaHandleOperator routeOperator = new 
DynamicSchemaHandleOperator(
-                    catalogLoader, multipleSinkOption, dirtyOptions, 
dirtySink, inlongMetric, auditHostAndPorts);
+                    catalogLoader, multipleSinkOption, dirtyOptions, 
dirtySink, inlongMetric, auditHostAndPorts,
+                    enableSchemaChange, schemaChangePolicies);
             SingleOutputStreamOperator<RecordWithSchema> routeStream = input
                     
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
                             TypeInformation.of(RecordWithSchema.class),
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index a21e4307d6..44541d132f 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -21,17 +21,22 @@ import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
-import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.schema.SchemaChangeHelper;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
-import org.apache.inlong.sort.base.sink.TableChange;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.iceberg.schema.IcebergSchemaChangeHelper;
+import org.apache.inlong.sort.schema.ColumnSchema;
+import org.apache.inlong.sort.schema.TableChange;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
@@ -47,17 +52,15 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +73,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -98,7 +102,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
 
     private transient Catalog catalog;
     private transient SupportsNamespaces asNamespaceCatalog;
-    private transient AbstractDynamicSchemaFormat<JsonNode> 
dynamicSchemaFormat;
+    private transient JsonDynamicSchemaFormat dynamicSchemaFormat;
     private transient ProcessingTimeService processingTimeService;
 
     // record cache, wait schema to consume record
@@ -118,18 +122,25 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     private @Nullable transient SinkTableMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
+    private SchemaChangeHelper schemaChangeHelper;
+    private String schemaChangePolicies;
+    private boolean enableSchemaChange;
 
     public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
             MultipleSinkOption multipleSinkOption,
             DirtyOptions dirtyOptions,
             @Nullable DirtySink<Object> dirtySink,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            boolean enableSchemaChange,
+            String schemaChangePolicies) {
         this.catalogLoader = catalogLoader;
         this.multipleSinkOption = multipleSinkOption;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
         this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        this.schemaChangePolicies = schemaChangePolicies;
+        this.enableSchemaChange = enableSchemaChange;
     }
 
     @SuppressWarnings("unchecked")
@@ -140,7 +151,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         this.asNamespaceCatalog =
                 catalog instanceof SupportsNamespaces ? (SupportsNamespaces) 
catalog : null;
 
-        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(
+        this.dynamicSchemaFormat = (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(
                 multipleSinkOption.getFormat(), 
multipleSinkOption.getFormatOption());
 
         this.processingTimeService = 
getRuntimeContext().getProcessingTimeService();
@@ -165,6 +176,11 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
             metricData = new SinkTableMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
         }
         this.dirtySinkHelper.open(new Configuration());
+        this.schemaChangeHelper = new 
IcebergSchemaChangeHelper(dynamicSchemaFormat,
+                enableSchemaChange, enableSchemaChange ? 
SchemaChangeUtils.deserialize(schemaChangePolicies) : null,
+                multipleSinkOption.getDatabasePattern(), 
multipleSinkOption.getTablePattern(),
+                multipleSinkOption.getSchemaUpdatePolicy(), metricData, 
dirtySinkHelper,
+                catalog, asNamespaceCatalog);
     }
 
     @Override
@@ -208,7 +224,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         }
         boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
         if (isDDL) {
-            execDDL(jsonNode, tableId);
+            execDDL(element.getValue().getBinary(0), jsonNode);
         } else {
             execDML(jsonNode, tableId);
         }
@@ -304,8 +320,8 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         }
     }
 
-    private void execDDL(JsonNode jsonNode, TableIdentifier tableId) {
-        // todo:parse ddl sql
+    private void execDDL(byte[] originData, JsonNode jsonNode) {
+        schemaChangeHelper.process(originData, jsonNode);
     }
 
     private void execDML(JsonNode jsonNode, TableIdentifier tableId) {
@@ -434,33 +450,29 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
 
     // ================================ All coordinator handle method 
==============================================
     private void handleTableCreateEventFromOperator(TableIdentifier tableId, 
Schema schema) {
-        if (!catalog.tableExists(tableId)) {
-            if (asNamespaceCatalog != null && 
!asNamespaceCatalog.namespaceExists(tableId.namespace())) {
-                try {
-                    asNamespaceCatalog.createNamespace(tableId.namespace());
-                    LOG.info("Auto create Database({}) in Catalog({}).", 
tableId.namespace(), catalog.name());
-                } catch (AlreadyExistsException e) {
-                    LOG.warn("Database({}) already exist in Catalog({})!", 
tableId.namespace(), catalog.name());
-                }
-            }
-            ImmutableMap.Builder<String, String> properties = 
ImmutableMap.builder();
-            properties.put("format-version", "2");
-            properties.put("write.upsert.enabled", "true");
-            properties.put("write.metadata.metrics.default", "full");
-            // for hive visible
-            properties.put("engine.hive.enabled", "true");
-            try {
-                catalog.createTable(tableId, schema, 
PartitionSpec.unpartitioned(), properties.build());
-                LOG.info("Auto create Table({}) in Database({}) in 
Catalog({})!",
-                        tableId.name(), tableId.namespace(), catalog.name());
-            } catch (AlreadyExistsException e) {
-                LOG.warn("Table({}) already exist in Database({}) in 
Catalog({})!",
-                        tableId.name(), tableId.namespace(), catalog.name());
-            }
-        }
+        IcebergSchemaChangeUtils.createTable(catalog, tableId, 
asNamespaceCatalog, schema);
         handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
     }
 
+    @VisibleForTesting
+    public static Map<String, ColumnSchema> extractColumnSchema(Schema schema) 
{
+        Map<String, ColumnSchema> columnSchemaMap = new LinkedHashMap<>();
+        List<Types.NestedField> nestedFieldList = schema.columns();
+        int n = nestedFieldList.size();
+        for (int i = 0; i < n; i++) {
+            Types.NestedField nestedField = nestedFieldList.get(i);
+            ColumnSchema columnSchema = new ColumnSchema();
+            columnSchema.setName(nestedField.name());
+            columnSchema.setType(FlinkSchemaUtil.convert(nestedField.type()));
+            columnSchema.setNullable(nestedField.isOptional());
+            columnSchema.setComment(nestedField.doc());
+            columnSchema.setPosition(i == 0 ? 
TableChange.ColumnPosition.first()
+                    : TableChange.ColumnPosition.after(nestedFieldList.get(i - 
1).name()));
+            columnSchemaMap.put(nestedField.name(), columnSchema);
+        }
+        return columnSchemaMap;
+    }
+
     private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, 
Schema oldSchema, Schema newSchema) {
         Table table = catalog.loadTable(tableId);
         // The transactionality of changes is guaranteed by comparing the old 
schema with the current schema of the
@@ -469,14 +481,16 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         // for scenarios that cannot be changed, it is always considered that 
there is a problem with the data.
         Transaction transaction = table.newTransaction();
         if (table.schema().sameSchema(oldSchema)) {
-            List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
+            Map<String, ColumnSchema> oldColumnSchemas = 
extractColumnSchema(oldSchema);
+            Map<String, ColumnSchema> newColumnSchemas = 
extractColumnSchema(newSchema);
+            List<TableChange> tableChanges = 
IcebergSchemaChangeUtils.diffSchema(oldColumnSchemas, newColumnSchemas);
             for (TableChange tableChange : tableChanges) {
                 if (tableChange instanceof TableChange.UnknownColumnChange) {
                     throw new UnsupportedOperationException(
                             String.format("Unsupported table %s schema change: 
%s.", tableId.toString(), tableChange));
                 }
             }
-            SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
+            
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
             LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
         }
         transaction.commitTransaction();
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
new file mode 100644
index 0000000000..31bfd732eb
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.inlong.sort.iceberg.FlinkTypeToType;
+import org.apache.inlong.sort.schema.TableChange;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class IcebergSchemaChangeUtils extends SchemaChangeUtils {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergSchemaChangeUtils.class);
+
+    private static final Joiner DOT = Joiner.on(".");
+
+    public static void createTable(Catalog catalog, TableIdentifier tableId, 
SupportsNamespaces asNamespaceCatalog,
+            Schema schema) {
+        if (!catalog.tableExists(tableId)) {
+            if (asNamespaceCatalog != null && 
!asNamespaceCatalog.namespaceExists(tableId.namespace())) {
+                try {
+                    asNamespaceCatalog.createNamespace(tableId.namespace());
+                    LOGGER.info("Auto create Database({}) in Catalog({}).", 
tableId.namespace(), catalog.name());
+                } catch (AlreadyExistsException e) {
+                    LOGGER.warn("Database({}) already exist in Catalog({})!", 
tableId.namespace(), catalog.name());
+                }
+            }
+            ImmutableMap.Builder<String, String> properties = 
ImmutableMap.builder();
+            properties.put("format-version", "2");
+            properties.put("write.upsert.enabled", "true");
+            properties.put("write.metadata.metrics.default", "full");
+            // for hive visible
+            properties.put("engine.hive.enabled", "true");
+            try {
+                catalog.createTable(tableId, schema, 
PartitionSpec.unpartitioned(), properties.build());
+                LOGGER.info("Auto create Table({}) in Database({}) in 
Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            } catch (AlreadyExistsException e) {
+                LOGGER.warn("Table({}) already exist in Database({}) in 
Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            }
+        }
+    }
+
+    public static void applySchemaChanges(UpdateSchema pendingUpdate, 
List<TableChange> tableChanges) {
+        for (TableChange change : tableChanges) {
+            if (change instanceof TableChange.AddColumn) {
+                applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
+            } else if (change instanceof TableChange.DeleteColumn) {
+                applyDeleteColumn(pendingUpdate, (TableChange.DeleteColumn) 
change);
+            } else if (change instanceof TableChange.UpdateColumn) {
+                applyUpdateColumn(pendingUpdate, (TableChange.UpdateColumn) 
change);
+            } else {
+                throw new UnsupportedOperationException("Cannot apply unknown 
table change: " + change);
+            }
+        }
+        pendingUpdate.commit();
+    }
+
+    public static void applyAddColumn(UpdateSchema pendingUpdate, 
TableChange.AddColumn add) {
+        Preconditions.checkArgument(add.isNullable(),
+                "Incompatible change: cannot add required column: %s", 
leafName(add.fieldNames()));
+        Type type = add.dataType().accept(new 
FlinkTypeToType(RowType.of(add.dataType())));
+        pendingUpdate.addColumn(parentName(add.fieldNames()), 
leafName(add.fieldNames()), type, add.comment());
+
+        if (add.position() instanceof TableChange.After) {
+            TableChange.After after = (TableChange.After) add.position();
+            String referenceField = peerName(add.fieldNames(), after.column());
+            pendingUpdate.moveAfter(DOT.join(add.fieldNames()), 
referenceField);
+
+        } else if (add.position() instanceof TableChange.First) {
+            pendingUpdate.moveFirst(DOT.join(add.fieldNames()));
+
+        } else {
+            Preconditions.checkArgument(add.position() == null,
+                    "Cannot add '%s' at unknown position: %s", 
DOT.join(add.fieldNames()), add.position());
+        }
+    }
+
+    public static void applyDeleteColumn(UpdateSchema pendingUpdate, 
TableChange.DeleteColumn delete) {
+        pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
+    }
+
+    public static void applyUpdateColumn(UpdateSchema pendingUpdate, 
TableChange.UpdateColumn update) {
+        Type type = update.dataType().accept(new 
FlinkTypeToType(RowType.of(update.dataType())));
+        pendingUpdate.updateColumn(DOT.join(update.fieldNames()), 
type.asPrimitiveType(), update.comment());
+    }
+
+    public static String leafName(String[] fieldNames) {
+        Preconditions.checkArgument(fieldNames.length > 0, "Invalid field 
name: at least one name is required");
+        return fieldNames[fieldNames.length - 1];
+    }
+
+    public static String peerName(String[] fieldNames, String fieldName) {
+        if (fieldNames.length > 1) {
+            String[] peerNames = Arrays.copyOf(fieldNames, fieldNames.length);
+            peerNames[fieldNames.length - 1] = fieldName;
+            return DOT.join(peerNames);
+        }
+        return fieldName;
+    }
+
+    public static String parentName(String[] fieldNames) {
+        if (fieldNames.length > 1) {
+            return DOT.join(Arrays.copyOfRange(fieldNames, 0, 
fieldNames.length - 1));
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
deleted file mode 100644
index f5f1001a70..0000000000
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.iceberg.sink.multiple;
-
-import org.apache.inlong.sort.base.sink.TableChange;
-import org.apache.inlong.sort.base.sink.TableChange.ColumnPosition;
-import org.apache.inlong.sort.base.sink.TableChange.UnknownColumnChange;
-import org.apache.inlong.sort.iceberg.FlinkTypeToType;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.UpdateSchema;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.Types.NestedField;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class SchemaChangeUtils {
-
-    private static final Joiner DOT = Joiner.on(".");
-
-    /**
-     * Compare two schemas and get the schema changes that happened in them.
-     * TODO: currently only support add column,delete column and column type 
change, rename column and column position change are not supported
-     *
-     * @param oldSchema
-     * @param newSchema
-     * @return
-     */
-    static List<TableChange> diffSchema(Schema oldSchema, Schema newSchema) {
-        List<String> oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-        List<String> newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-        Set<String> oldFieldSet = new HashSet<>(oldFields);
-        Set<String> newFieldSet = new HashSet<>(newFields);
-
-        Set<String> intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
-        Set<String> colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
-        Set<String> colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
-
-        List<TableChange> tableChanges = new ArrayList<>();
-
-        // step0: judge whether unknown change
-        // 1.just diff two different schema can not distinguish(add + delete) 
vs modify
-        // Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
-        // change.
-        // In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
-        if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
-            tableChanges.add(new UnknownColumnChange(
-                    String.format(" Old schema: [%s] and new schema: [%s], it 
is unknown column change",
-                            oldSchema.toString(), newSchema.toString())));
-            return tableChanges;
-        }
-
-        // 2.if some filed positions in new schema are not same with old 
schema, there is no way to deal with it.
-        // This situation only is regarded as unknown column change
-        if (colsToDelete.isEmpty() && colsToAdd.isEmpty() && 
oldFieldSet.equals(newFieldSet)
-                && !oldFields.equals(newFields)) {
-            tableChanges.add(
-                    new UnknownColumnChange(
-                            String.format(
-                                    " Old schema: [%s] and new schema: [%s], 
they are same but some filed positions are not same."
-                                            +
-                                            " This situation only is regarded 
as unknown column change at present",
-                                    oldSchema.toString(), 
newSchema.toString())));
-            return tableChanges;
-        }
-
-        // step1: judge whether column type change
-        for (String colName : intersectColSet) {
-            NestedField oldField = oldSchema.findField(colName);
-            NestedField newField = newSchema.findField(colName);
-            if (!oldField.type().equals(newField.type()) || 
!oldField.doc().equals(newField.doc())) {
-                tableChanges.add(
-                        new TableChange.UpdateColumn(
-                                new String[]{newField.name()},
-                                FlinkSchemaUtil.convert(newField.type()),
-                                !newField.isRequired(),
-                                newField.doc()));
-            }
-        }
-
-        // step2: judge whether delete column
-        for (String colName : oldFields) {
-            if (colsToDelete.contains(colName)) {
-                tableChanges.add(
-                        new TableChange.DeleteColumn(
-                                new String[]{colName}));
-            }
-        }
-
-        // step3: judge whether add column
-        if (!colsToAdd.isEmpty()) {
-            for (int i = 0; i < newFields.size(); i++) {
-                String colName = newFields.get(i);
-                if (colsToAdd.contains(colName)) {
-                    NestedField addField = newSchema.findField(colName);
-                    tableChanges.add(
-                            new TableChange.AddColumn(
-                                    new String[]{addField.name()},
-                                    FlinkSchemaUtil.convert(addField.type()),
-                                    !addField.isRequired(),
-                                    addField.doc(),
-                                    i == 0 ? ColumnPosition.first() : 
ColumnPosition.after(newFields.get(i - 1))));
-                }
-            }
-        }
-        return tableChanges;
-    }
-
-    public static void applySchemaChanges(UpdateSchema pendingUpdate, 
List<TableChange> tableChanges) {
-        for (TableChange change : tableChanges) {
-            if (change instanceof TableChange.AddColumn) {
-                applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
-            } else if (change instanceof TableChange.DeleteColumn) {
-                applyDeleteColumn(pendingUpdate, (TableChange.DeleteColumn) 
change);
-            } else if (change instanceof TableChange.UpdateColumn) {
-                applyUpdateColumn(pendingUpdate, (TableChange.UpdateColumn) 
change);
-            } else {
-                throw new UnsupportedOperationException("Cannot apply unknown 
table change: " + change);
-            }
-        }
-        pendingUpdate.commit();
-    }
-
-    public static void applyAddColumn(UpdateSchema pendingUpdate, 
TableChange.AddColumn add) {
-        Preconditions.checkArgument(add.isNullable(),
-                "Incompatible change: cannot add required column: %s", 
leafName(add.fieldNames()));
-        Type type = add.dataType().accept(new 
FlinkTypeToType(RowType.of(add.dataType())));
-        pendingUpdate.addColumn(parentName(add.fieldNames()), 
leafName(add.fieldNames()), type, add.comment());
-
-        if (add.position() instanceof TableChange.After) {
-            TableChange.After after = (TableChange.After) add.position();
-            String referenceField = peerName(add.fieldNames(), after.column());
-            pendingUpdate.moveAfter(DOT.join(add.fieldNames()), 
referenceField);
-
-        } else if (add.position() instanceof TableChange.First) {
-            pendingUpdate.moveFirst(DOT.join(add.fieldNames()));
-
-        } else {
-            Preconditions.checkArgument(add.position() == null,
-                    "Cannot add '%s' at unknown position: %s", 
DOT.join(add.fieldNames()), add.position());
-        }
-    }
-
-    public static void applyDeleteColumn(UpdateSchema pendingUpdate, 
TableChange.DeleteColumn delete) {
-        pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
-    }
-
-    public static void applyUpdateColumn(UpdateSchema pendingUpdate, 
TableChange.UpdateColumn update) {
-        Type type = update.dataType().accept(new 
FlinkTypeToType(RowType.of(update.dataType())));
-        pendingUpdate.updateColumn(DOT.join(update.fieldNames()), 
type.asPrimitiveType(), update.comment());
-    }
-
-    public static String leafName(String[] fieldNames) {
-        Preconditions.checkArgument(fieldNames.length > 0, "Invalid field 
name: at least one name is required");
-        return fieldNames[fieldNames.length - 1];
-    }
-
-    public static String peerName(String[] fieldNames, String fieldName) {
-        if (fieldNames.length > 1) {
-            String[] peerNames = Arrays.copyOf(fieldNames, fieldNames.length);
-            peerNames[fieldNames.length - 1] = fieldName;
-            return DOT.join(peerNames);
-        }
-        return fieldName;
-    }
-
-    public static String parentName(String[] fieldNames) {
-        if (fieldNames.length > 1) {
-            return DOT.join(Arrays.copyOfRange(fieldNames, 0, 
fieldNames.length - 1));
-        }
-        return null;
-    }
-}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestIcebergSchemaChangeUtils.java
similarity index 85%
rename from 
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
rename to 
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestIcebergSchemaChangeUtils.java
index 466214434c..9552ebeebd 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestIcebergSchemaChangeUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
-import org.apache.inlong.sort.base.sink.TableChange;
+import org.apache.inlong.sort.schema.TableChange;
 
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.types.Types;
@@ -27,7 +27,9 @@ import org.junit.Test;
 
 import java.util.List;
 
-public class TestSchemaChangeUtils {
+import static 
org.apache.inlong.sort.iceberg.sink.multiple.DynamicSchemaHandleOperator.extractColumnSchema;
+
+public class TestIcebergSchemaChangeUtils {
 
     private Schema baseSchema;
 
@@ -58,7 +60,7 @@ public class TestSchemaChangeUtils {
 
         Assert.assertFalse(baseSchema.sameSchema(addColSchema));
 
-        return SchemaChangeUtils.diffSchema(baseSchema, addColSchema);
+        return 
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema), 
extractColumnSchema(addColSchema));
     }
 
     @Test
@@ -78,7 +80,7 @@ public class TestSchemaChangeUtils {
 
         Assert.assertFalse(baseSchema.sameSchema(delColSchema));
 
-        return SchemaChangeUtils.diffSchema(baseSchema, delColSchema);
+        return 
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema), 
extractColumnSchema(delColSchema));
     }
 
     @Test
@@ -107,7 +109,8 @@ public class TestSchemaChangeUtils {
 
         Assert.assertFalse(baseSchema.sameSchema(updateTypeColSchema));
 
-        return SchemaChangeUtils.diffSchema(baseSchema, updateTypeColSchema);
+        return 
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
+                extractColumnSchema(updateTypeColSchema));
     }
 
     public List<TableChange> testCommentTypeColumn() {
@@ -118,7 +121,8 @@ public class TestSchemaChangeUtils {
 
         Assert.assertFalse(baseSchema.sameSchema(updateCommentColSchema));
 
-        return SchemaChangeUtils.diffSchema(baseSchema, 
updateCommentColSchema);
+        return 
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
+                extractColumnSchema(updateCommentColSchema));
     }
 
     @Test
@@ -130,7 +134,8 @@ public class TestSchemaChangeUtils {
 
         Assert.assertFalse(baseSchema.sameSchema(renameColumnSchema));
 
-        List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(baseSchema, renameColumnSchema);
+        List<TableChange> tableChanges = 
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
+                extractColumnSchema(renameColumnSchema));
         Assert.assertEquals("rename column is not supported.", 1, 
tableChanges.size());
         for (TableChange tableChange : tableChanges) {
             Assert.assertTrue("The table changes must be UnknownColumnChange ",
@@ -147,7 +152,8 @@ public class TestSchemaChangeUtils {
 
         Assert.assertFalse(baseSchema.sameSchema(positionChangeSchema));
 
-        List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(baseSchema, positionChangeSchema);
+        List<TableChange> tableChanges = 
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
+                extractColumnSchema(positionChangeSchema));
         Assert.assertTrue(tableChanges.size() == 1);
         for (TableChange tableChange : tableChanges) {
             Assert.assertTrue("The table changes must be UnknownColumnChange ",

Reply via email to