This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d419b76002 [INLONG-8734][Sort] Improve iceberg sync ddl (#8735)
d419b76002 is described below

commit d419b76002090578f67d2af034e08707c3e09b18
Author: Xin Gong <[email protected]>
AuthorDate: Wed Aug 16 16:51:20 2023 +0800

    [INLONG-8734][Sort] Improve iceberg sync ddl (#8735)
---
 .../org/apache/inlong/sort/base/Constants.java     |  6 +++
 .../sort/base/schema/SchemaChangeHelper.java       | 19 +++++----
 .../inlong/sort/iceberg/IcebergTableSink.java      |  3 ++
 .../iceberg/schema/IcebergSchemaChangeHelper.java  | 48 ++++++++++++++--------
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  8 +++-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 38 +++++++++++------
 .../sink/multiple/IcebergMultipleStreamWriter.java |  8 +++-
 .../sink/multiple/IcebergSchemaChangeUtils.java    | 32 ++++++++++-----
 .../iceberg/sink/multiple/RecordWithSchema.java    |  7 +++-
 9 files changed, 115 insertions(+), 54 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 7a42c892d0..c0fbed00c7 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -413,4 +413,10 @@ public final class Constants {
                     .noDefaultValue()
                     .withDescription("The policies of schema-change, format is 
'key1=value1&key2=value2', "
                             + "the key is the type of schema-change and the 
value is the support policy of schema-change");
+
+    public static final ConfigOption<Boolean> 
SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT =
+            ConfigOptions.key("sink.multiple.auto-create-table-when-snapshot")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether supporting auto create table 
when snapshot, default value is 'false'");
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
index 07ef0f1456..8d7f766c1d 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
@@ -91,16 +91,19 @@ public abstract class SchemaChangeHelper implements 
SchemaChangeHandle {
             LOGGER.warn("Parse database, table from origin data failed, origin 
data: {}", new String(originData), e);
             if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
                 dirtySinkHelper.invoke(new String(originData), 
DirtyType.JSON_PROCESS_ERROR, e);
-            }
-            if (metricData != null) {
-                metricData.invokeDirty(1, originData.length);
+                if (metricData != null) {
+                    metricData.invokeDirty(1, originData.length);
+                }
             }
             return;
         }
         Operation operation;
         try {
-            JsonNode operationNode = 
Preconditions.checkNotNull(data.get("operation"),
-                    "Operation node is null");
+            JsonNode operationNode = data.get("operation");
+            if (operationNode == null) {
+                LOGGER.warn("operation is null. Unsupported for schema-change: 
{}", data);
+                return;
+            }
             operation = Preconditions.checkNotNull(
                     
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new 
TypeReference<Operation>() {
                     }), "Operation is null");
@@ -240,9 +243,9 @@ public abstract class SchemaChangeHelper implements 
SchemaChangeHandle {
             String logTag = parseValue(data, 
dirtySinkHelper.getDirtyOptions().getLogTag());
             String identifier = parseValue(data, 
dirtySinkHelper.getDirtyOptions().getIdentifier());
             dirtySinkHelper.invoke(new String(originData), dirtyType, label, 
logTag, identifier, e);
-        }
-        if (metricData != null) {
-            metricData.outputDirtyMetricsWithEstimate(database, table, 1, 
originData.length);
+            if (metricData != null) {
+                metricData.outputDirtyMetricsWithEstimate(database, table, 1, 
originData.length);
+            }
         }
     }
     private String parseValue(JsonNode data, String pattern) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index f57b2c62e0..38980a5d90 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -50,6 +50,7 @@ import java.util.Map;
 import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static 
org.apache.inlong.sort.base.Constants.SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
@@ -125,6 +126,7 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
         final ReadableConfig tableOptions = 
Configuration.fromMap(catalogTable.getOptions());
         boolean multipleSink = tableOptions.get(SINK_MULTIPLE_ENABLE);
         boolean schemaChange = tableOptions.get(SINK_SCHEMA_CHANGE_ENABLE);
+        boolean autoCreateTableWhenSnapshot = 
tableOptions.get(SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT);
         String schemaChangePolicies = 
tableOptions.getOptional(SINK_SCHEMA_CHANGE_POLICIES).orElse(null);
         LOG.info("iceberg sink running with policy {}", 
tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY));
         if (multipleSink) {
@@ -152,6 +154,7 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
                     
.distributionMode(DistributionMode.fromName(tableOptions.get(WRITE_DISTRIBUTION_MODE)))
                     .enableSchemaChange(schemaChange)
                     .schemaChangePolicies(schemaChangePolicies)
+                    .autoCreateTableWhenSnapshot(autoCreateTableWhenSnapshot)
                     .append();
         } else {
             return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
index a44aba50ad..1e6e3012f3 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Iceberg schema change helper
@@ -60,6 +61,7 @@ public class IcebergSchemaChangeHelper extends 
SchemaChangeHelper {
     private transient Catalog catalog;
 
     private transient SupportsNamespaces asNamespaceCatalog;
+    private AtomicBoolean isSuccessDDL = new AtomicBoolean(false);
 
     public IcebergSchemaChangeHelper(JsonDynamicSchemaFormat 
dynamicSchemaFormat, boolean schemaChange,
             Map<SchemaChangeType, SchemaChangePolicy> policyMap, String 
databasePattern, String tablePattern,
@@ -76,24 +78,31 @@ public class IcebergSchemaChangeHelper extends 
SchemaChangeHelper {
     public void doAlterOperation(String database, String table, byte[] 
originData, String originSchema, JsonNode data,
             Map<SchemaChangeType, List<AlterColumn>> typeMap) {
         for (Map.Entry<SchemaChangeType, List<AlterColumn>> kv : 
typeMap.entrySet()) {
+            SchemaChangePolicy policy = policyMap.get(kv.getKey());
             try {
-                switch (kv.getKey()) {
-                    case ADD_COLUMN:
-                        doAddColumn(kv.getValue(), 
TableIdentifier.of(database, table));
-                        break;
-                    case DROP_COLUMN:
-                        doDropColumn(kv.getKey(), originSchema);
-                        break;
-                    case RENAME_COLUMN:
-                        doRenameColumn(kv.getKey(), originSchema);
-                        break;
-                    case CHANGE_COLUMN_TYPE:
-                        doChangeColumnType(kv.getKey(), originSchema);
-                        break;
-                    default:
+                if (policy != SchemaChangePolicy.ENABLE) {
+                    doSchemaChangeBase(kv.getKey(), policy, originSchema);
+                } else {
+                    switch (kv.getKey()) {
+                        case ADD_COLUMN:
+                            doAddColumn(kv.getValue(), 
TableIdentifier.of(database, table));
+                            break;
+                        case DROP_COLUMN:
+                            doDropColumn(kv.getKey(), originSchema);
+                            break;
+                        case RENAME_COLUMN:
+                            doRenameColumn(kv.getKey(), originSchema);
+                            break;
+                        case CHANGE_COLUMN_TYPE:
+                            doChangeColumnType(kv.getKey(), originSchema);
+                            break;
+                        default:
+                    }
+                    isSuccessDDL.set(true);
                 }
             } catch (Exception e) {
-                if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                if (policy == SchemaChangePolicy.ERROR ||
+                        exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
                     throw new SchemaChangeHandleException(
                             String.format("Apply alter column failed, origin 
schema: %s", originSchema), e);
                 }
@@ -112,14 +121,13 @@ public class IcebergSchemaChangeHelper extends 
SchemaChangeHelper {
             RowType rowType = dynamicSchemaFormat.extractSchema(data, 
pkListStr);
             Schema schema = 
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(rowType));
             IcebergSchemaChangeUtils.createTable(catalog, tableId, 
asNamespaceCatalog, schema);
-            return;
+            isSuccessDDL.set(true);
         } catch (Exception e) {
             if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
                 throw new SchemaChangeHandleException(
-                        String.format("Drop column failed, origin schema: %s", 
originSchema), e);
+                        String.format("create table failed, origin schema: 
%s", originSchema), e);
             }
             handleDirtyData(data, originData, database, table, 
DirtyType.CREATE_TABLE_ERROR, e);
-            return;
         }
     }
 
@@ -149,4 +157,8 @@ public class IcebergSchemaChangeHelper extends 
SchemaChangeHelper {
         
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
         transaction.commitTransaction();
     }
+
+    public AtomicBoolean ddlExecSuccess() {
+        return isSuccessDDL;
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index c023b99530..b1a2c4727a 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -189,6 +189,7 @@ public class FlinkSink {
         private boolean enableSchemaChange;
         private String schemaChangePolicies;
         private boolean switchAppendUpsertEnable = false;
+        private boolean autoCreateTableWhenSnapshot;
 
         private Builder() {
         }
@@ -283,6 +284,11 @@ public class FlinkSink {
             return this;
         }
 
+        public Builder autoCreateTableWhenSnapshot(boolean 
autoCreateTableWhenSnapshot) {
+            this.autoCreateTableWhenSnapshot = autoCreateTableWhenSnapshot;
+            return this;
+        }
+
         public Builder schemaChangePolicies(String schemaChangePolicies) {
             this.schemaChangePolicies = schemaChangePolicies;
             return this;
@@ -699,7 +705,7 @@ public class FlinkSink {
             int parallelism = writeParallelism == null ? 
input.getParallelism() : writeParallelism;
             DynamicSchemaHandleOperator routeOperator = new 
DynamicSchemaHandleOperator(
                     catalogLoader, multipleSinkOption, dirtyOptions, 
dirtySink, inlongMetric, auditHostAndPorts,
-                    enableSchemaChange, schemaChangePolicies);
+                    enableSchemaChange, schemaChangePolicies, 
autoCreateTableWhenSnapshot);
             SingleOutputStreamOperator<RecordWithSchema> routeStream = input
                     
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
                             TypeInformation.of(RecordWithSchema.class),
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 2591c4fd59..238cf7e6ff 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
-import org.apache.inlong.sort.base.schema.SchemaChangeHelper;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
@@ -65,7 +64,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.ws.rs.NotSupportedException;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -123,10 +121,11 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     private @Nullable transient SinkTableMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
-    private SchemaChangeHelper schemaChangeHelper;
+    private IcebergSchemaChangeHelper schemaChangeHelper;
     private String schemaChangePolicies;
     private boolean enableSchemaChange;
     private final String INCREMENTAL = "incremental";
+    private boolean autoCreateTableWhenSnapshot;
 
     public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
             MultipleSinkOption multipleSinkOption,
@@ -135,7 +134,8 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
             String inlongMetric,
             String auditHostAndPorts,
             boolean enableSchemaChange,
-            String schemaChangePolicies) {
+            String schemaChangePolicies,
+            boolean autoCreateTableWhenSnapshot) {
         this.catalogLoader = catalogLoader;
         this.multipleSinkOption = multipleSinkOption;
         this.inlongMetric = inlongMetric;
@@ -143,6 +143,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
         this.schemaChangePolicies = schemaChangePolicies;
         this.enableSchemaChange = enableSchemaChange;
+        this.autoCreateTableWhenSnapshot = autoCreateTableWhenSnapshot;
     }
 
     @SuppressWarnings("unchecked")
@@ -244,8 +245,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         }
 
         for (RowData rowData : rowDataForDataSchemaList) {
-            DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
-            if (!dirtyOptions.ignoreDirty()) {
+            if (dirtySinkHelper.getDirtySink() == null) {
                 if (metricData != null) {
                     
metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(),
                             tableId.name(), rowData);
@@ -314,7 +314,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         if (this.inlongMetric != null) {
             this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
                     new ListStateDescriptor<>(
-                            String.format(INLONG_METRIC_STATE_NAME), 
TypeInformation.of(new TypeHint<MetricState>() {
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint<MetricState>() {
                             })));
         }
         if (context.isRestored()) {
@@ -325,6 +325,12 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
 
     private void execDDL(byte[] originData, JsonNode jsonNode) {
         schemaChangeHelper.process(originData, jsonNode);
+        if (schemaChangeHelper.ddlExecSuccess().get()) {
+            RecordWithSchema record = new RecordWithSchema();
+            record.setDDL(true);
+            output.collect(new StreamRecord<>(record));
+            schemaChangeHelper.ddlExecSuccess().set(false);
+        }
     }
 
     private void execDML(JsonNode jsonNode, TableIdentifier tableId) {
@@ -343,7 +349,9 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         });
         if (schema == null) {
             try {
-                handleTableCreateEventFromOperator(record.getTableId(), 
dataSchema);
+                boolean incremental = 
Optional.ofNullable(jsonNode.get(INCREMENTAL))
+                        .map(JsonNode::asBoolean).orElse(false);
+                handleTableCreateEventFromOperator(record.getTableId(), 
dataSchema, incremental);
             } catch (Exception e) {
                 LOGGER.error("Table create error, tableId: {}, schema: {}", 
record.getTableId(), dataSchema);
                 if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
multipleSinkOption
@@ -382,6 +390,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                             try {
                                 return 
dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1));
                             } catch (Exception e) {
+                                LOG.error(String.format("Table %s extract 
RowData failed!", tableId), e);
                                 if 
(SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption
                                         .getSchemaUpdatePolicy()) {
                                     isDirty.set(true);
@@ -390,7 +399,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                                         rowDataForDataSchemaList = 
dynamicSchemaFormat
                                                 .extractRowData(jsonNode, 
FlinkSchemaUtil.convert(dataSchema));
                                     } catch (Throwable ee) {
-                                        LOG.error("extractRowData {} failed!", 
jsonNode, ee);
+                                        LOG.error("extract RowData {} 
failed!", jsonNode, ee);
                                     }
 
                                     for (RowData rowData : 
rowDataForDataSchemaList) {
@@ -430,11 +439,12 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                         .map(JsonNode::asBoolean).orElse(false));
                 output.collect(new StreamRecord<>(recordWithSchema));
             } else {
+                LOG.warn("Table {} schema is different!", tableId);
                 if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
multipleSinkOption
                         .getSchemaUpdatePolicy()) {
                     RecordWithSchema recordWithSchema = queue.poll();
                     
handleDirtyDataOfLogWithIgnore(recordWithSchema.getOriginalData(), dataSchema, 
tableId,
-                            new NotSupportedException(
+                            new RuntimeException(
                                     String.format("SchemaUpdatePolicy %s does 
not support schema dynamic update!",
                                             
multipleSinkOption.getSchemaUpdatePolicy())));
                 } else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == 
multipleSinkOption
@@ -446,7 +456,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                     handldAlterSchemaEventFromOperator(tableId, latestSchema, 
dataSchema);
                     break;
                 } else {
-                    throw new NotSupportedException(
+                    throw new RuntimeException(
                             String.format("SchemaUpdatePolicy %s does not 
support schema dynamic update!",
                                     
multipleSinkOption.getSchemaUpdatePolicy()));
                 }
@@ -455,8 +465,10 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     }
 
     // ================================ All coordinator handle method 
==============================================
-    private void handleTableCreateEventFromOperator(TableIdentifier tableId, 
Schema schema) {
-        IcebergSchemaChangeUtils.createTable(catalog, tableId, 
asNamespaceCatalog, schema);
+    private void handleTableCreateEventFromOperator(TableIdentifier tableId, 
Schema schema, boolean incremental) {
+        if (this.autoCreateTableWhenSnapshot && !incremental) {
+            IcebergSchemaChangeUtils.createTable(catalog, tableId, 
asNamespaceCatalog, schema);
+        }
         handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 88a2e27474..b581772b5b 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -195,7 +195,13 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
     @Override
     public void processElement(RecordWithSchema recordWithSchema) throws 
Exception {
         TableIdentifier tableId = recordWithSchema.getTableId();
-
+        if (recordWithSchema.isDDL()) {
+            // just record node metrics for ddl
+            if (sinkMetricData != null) {
+                sinkMetricData.outputMetricsWithEstimate(1);
+            }
+            return;
+        }
         if (isSchemaUpdate(recordWithSchema)) {
             if (multipleTables.get(tableId) == null) {
                 Table table = catalog.loadTable(recordWithSchema.getTableId());
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
index 31bfd732eb..cd0626bd75 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
@@ -74,23 +74,33 @@ public class IcebergSchemaChangeUtils extends 
SchemaChangeUtils {
     }
 
     public static void applySchemaChanges(UpdateSchema pendingUpdate, 
List<TableChange> tableChanges) {
-        for (TableChange change : tableChanges) {
-            if (change instanceof TableChange.AddColumn) {
-                applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
-            } else if (change instanceof TableChange.DeleteColumn) {
-                applyDeleteColumn(pendingUpdate, (TableChange.DeleteColumn) 
change);
-            } else if (change instanceof TableChange.UpdateColumn) {
-                applyUpdateColumn(pendingUpdate, (TableChange.UpdateColumn) 
change);
+        try {
+            for (TableChange change : tableChanges) {
+                if (change instanceof TableChange.AddColumn) {
+                    applyAddColumn(pendingUpdate, (TableChange.AddColumn) 
change);
+                } else if (change instanceof TableChange.DeleteColumn) {
+                    applyDeleteColumn(pendingUpdate, 
(TableChange.DeleteColumn) change);
+                } else if (change instanceof TableChange.UpdateColumn) {
+                    applyUpdateColumn(pendingUpdate, 
(TableChange.UpdateColumn) change);
+                } else {
+                    throw new UnsupportedOperationException("Cannot apply 
unknown table change: " + change);
+                }
+            }
+            pendingUpdate.commit();
+        } catch (Exception e) {
+            String addColumnDuplicationExecException = "Cannot add column, 
name already exists";
+            String deleteColumnDuplicationExecException = "Cannot delete 
missing column";
+            if (e.getMessage().contains(addColumnDuplicationExecException) ||
+                    
e.getMessage().contains(deleteColumnDuplicationExecException)) {
+                // try catch exception for replay ddl binlog
+                LOGGER.warn("ddl exec exception", e);
             } else {
-                throw new UnsupportedOperationException("Cannot apply unknown 
table change: " + change);
+                throw e;
             }
         }
-        pendingUpdate.commit();
     }
 
     public static void applyAddColumn(UpdateSchema pendingUpdate, 
TableChange.AddColumn add) {
-        Preconditions.checkArgument(add.isNullable(),
-                "Incompatible change: cannot add required column: %s", 
leafName(add.fieldNames()));
         Type type = add.dataType().accept(new 
FlinkTypeToType(RowType.of(add.dataType())));
         pendingUpdate.addColumn(parentName(add.fieldNames()), 
leafName(add.fieldNames()), type, add.comment());
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
index c929ab3742..7e44db2594 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.Schema;
@@ -45,6 +46,7 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 @Data
+@NoArgsConstructor
 public class RecordWithSchema {
 
     public RecordWithSchema(
@@ -61,6 +63,7 @@ public class RecordWithSchema {
     private boolean isDirty;
     private long rowCount;
     private long rowSize;
+    private boolean isDDL = false;
     private boolean incremental;
 
     private transient JsonNode originalData;
@@ -69,9 +72,9 @@ public class RecordWithSchema {
 
     private Schema schema;
 
-    private final TableIdentifier tableId;
+    private TableIdentifier tableId;
 
-    private final List<String> primaryKeys;
+    private List<String> primaryKeys;
 
     public List<RowData> getData() {
         return data;

Reply via email to