This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d419b76002 [INLONG-8734][Sort] Improve iceberg sync ddl (#8735)
d419b76002 is described below
commit d419b76002090578f67d2af034e08707c3e09b18
Author: Xin Gong <[email protected]>
AuthorDate: Wed Aug 16 16:51:20 2023 +0800
[INLONG-8734][Sort] Improve iceberg sync ddl (#8735)
---
.../org/apache/inlong/sort/base/Constants.java | 6 +++
.../sort/base/schema/SchemaChangeHelper.java | 19 +++++----
.../inlong/sort/iceberg/IcebergTableSink.java | 3 ++
.../iceberg/schema/IcebergSchemaChangeHelper.java | 48 ++++++++++++++--------
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 8 +++-
.../sink/multiple/DynamicSchemaHandleOperator.java | 38 +++++++++++------
.../sink/multiple/IcebergMultipleStreamWriter.java | 8 +++-
.../sink/multiple/IcebergSchemaChangeUtils.java | 32 ++++++++++-----
.../iceberg/sink/multiple/RecordWithSchema.java | 7 +++-
9 files changed, 115 insertions(+), 54 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 7a42c892d0..c0fbed00c7 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -413,4 +413,10 @@ public final class Constants {
.noDefaultValue()
.withDescription("The policies of schema-change, format is
'key1=value1&key2=value2', "
+ "the key is the type of schema-change and the
value is the support policy of schema-change");
+
+ public static final ConfigOption<Boolean>
SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT =
+ ConfigOptions.key("sink.multiple.auto-create-table-when-snapshot")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether supporting auto create table
when snapshot, default value is 'false'");
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
index 07ef0f1456..8d7f766c1d 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
@@ -91,16 +91,19 @@ public abstract class SchemaChangeHelper implements
SchemaChangeHandle {
LOGGER.warn("Parse database, table from origin data failed, origin
data: {}", new String(originData), e);
if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
dirtySinkHelper.invoke(new String(originData),
DirtyType.JSON_PROCESS_ERROR, e);
- }
- if (metricData != null) {
- metricData.invokeDirty(1, originData.length);
+ if (metricData != null) {
+ metricData.invokeDirty(1, originData.length);
+ }
}
return;
}
Operation operation;
try {
- JsonNode operationNode =
Preconditions.checkNotNull(data.get("operation"),
- "Operation node is null");
+ JsonNode operationNode = data.get("operation");
+ if (operationNode == null) {
+ LOGGER.warn("operation is null. Unsupported for schema-change:
{}", data);
+ return;
+ }
operation = Preconditions.checkNotNull(
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new
TypeReference<Operation>() {
}), "Operation is null");
@@ -240,9 +243,9 @@ public abstract class SchemaChangeHelper implements
SchemaChangeHandle {
String logTag = parseValue(data,
dirtySinkHelper.getDirtyOptions().getLogTag());
String identifier = parseValue(data,
dirtySinkHelper.getDirtyOptions().getIdentifier());
dirtySinkHelper.invoke(new String(originData), dirtyType, label,
logTag, identifier, e);
- }
- if (metricData != null) {
- metricData.outputDirtyMetricsWithEstimate(database, table, 1,
originData.length);
+ if (metricData != null) {
+ metricData.outputDirtyMetricsWithEstimate(database, table, 1,
originData.length);
+ }
}
}
private String parseValue(JsonNode data, String pattern) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index f57b2c62e0..38980a5d90 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -50,6 +50,7 @@ import java.util.Map;
import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static
org.apache.inlong.sort.base.Constants.SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
@@ -125,6 +126,7 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
final ReadableConfig tableOptions =
Configuration.fromMap(catalogTable.getOptions());
boolean multipleSink = tableOptions.get(SINK_MULTIPLE_ENABLE);
boolean schemaChange = tableOptions.get(SINK_SCHEMA_CHANGE_ENABLE);
+ boolean autoCreateTableWhenSnapshot =
tableOptions.get(SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT);
String schemaChangePolicies =
tableOptions.getOptional(SINK_SCHEMA_CHANGE_POLICIES).orElse(null);
LOG.info("iceberg sink running with policy {}",
tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY));
if (multipleSink) {
@@ -152,6 +154,7 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
.distributionMode(DistributionMode.fromName(tableOptions.get(WRITE_DISTRIBUTION_MODE)))
.enableSchemaChange(schemaChange)
.schemaChangePolicies(schemaChangePolicies)
+ .autoCreateTableWhenSnapshot(autoCreateTableWhenSnapshot)
.append();
} else {
return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
index a44aba50ad..1e6e3012f3 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Iceberg schema change helper
@@ -60,6 +61,7 @@ public class IcebergSchemaChangeHelper extends
SchemaChangeHelper {
private transient Catalog catalog;
private transient SupportsNamespaces asNamespaceCatalog;
+ private AtomicBoolean isSuccessDDL = new AtomicBoolean(false);
public IcebergSchemaChangeHelper(JsonDynamicSchemaFormat
dynamicSchemaFormat, boolean schemaChange,
Map<SchemaChangeType, SchemaChangePolicy> policyMap, String
databasePattern, String tablePattern,
@@ -76,24 +78,31 @@ public class IcebergSchemaChangeHelper extends
SchemaChangeHelper {
public void doAlterOperation(String database, String table, byte[]
originData, String originSchema, JsonNode data,
Map<SchemaChangeType, List<AlterColumn>> typeMap) {
for (Map.Entry<SchemaChangeType, List<AlterColumn>> kv :
typeMap.entrySet()) {
+ SchemaChangePolicy policy = policyMap.get(kv.getKey());
try {
- switch (kv.getKey()) {
- case ADD_COLUMN:
- doAddColumn(kv.getValue(),
TableIdentifier.of(database, table));
- break;
- case DROP_COLUMN:
- doDropColumn(kv.getKey(), originSchema);
- break;
- case RENAME_COLUMN:
- doRenameColumn(kv.getKey(), originSchema);
- break;
- case CHANGE_COLUMN_TYPE:
- doChangeColumnType(kv.getKey(), originSchema);
- break;
- default:
+ if (policy != SchemaChangePolicy.ENABLE) {
+ doSchemaChangeBase(kv.getKey(), policy, originSchema);
+ } else {
+ switch (kv.getKey()) {
+ case ADD_COLUMN:
+ doAddColumn(kv.getValue(),
TableIdentifier.of(database, table));
+ break;
+ case DROP_COLUMN:
+ doDropColumn(kv.getKey(), originSchema);
+ break;
+ case RENAME_COLUMN:
+ doRenameColumn(kv.getKey(), originSchema);
+ break;
+ case CHANGE_COLUMN_TYPE:
+ doChangeColumnType(kv.getKey(), originSchema);
+ break;
+ default:
+ }
+ isSuccessDDL.set(true);
}
} catch (Exception e) {
- if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ if (policy == SchemaChangePolicy.ERROR ||
+ exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
throw new SchemaChangeHandleException(
String.format("Apply alter column failed, origin
schema: %s", originSchema), e);
}
@@ -112,14 +121,13 @@ public class IcebergSchemaChangeHelper extends
SchemaChangeHelper {
RowType rowType = dynamicSchemaFormat.extractSchema(data,
pkListStr);
Schema schema =
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(rowType));
IcebergSchemaChangeUtils.createTable(catalog, tableId,
asNamespaceCatalog, schema);
- return;
+ isSuccessDDL.set(true);
} catch (Exception e) {
if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
throw new SchemaChangeHandleException(
- String.format("Drop column failed, origin schema: %s",
originSchema), e);
+ String.format("create table failed, origin schema:
%s", originSchema), e);
}
handleDirtyData(data, originData, database, table,
DirtyType.CREATE_TABLE_ERROR, e);
- return;
}
}
@@ -149,4 +157,8 @@ public class IcebergSchemaChangeHelper extends
SchemaChangeHelper {
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(),
tableChanges);
transaction.commitTransaction();
}
+
+ public AtomicBoolean ddlExecSuccess() {
+ return isSuccessDDL;
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index c023b99530..b1a2c4727a 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -189,6 +189,7 @@ public class FlinkSink {
private boolean enableSchemaChange;
private String schemaChangePolicies;
private boolean switchAppendUpsertEnable = false;
+ private boolean autoCreateTableWhenSnapshot;
private Builder() {
}
@@ -283,6 +284,11 @@ public class FlinkSink {
return this;
}
+ public Builder autoCreateTableWhenSnapshot(boolean
autoCreateTableWhenSnapshot) {
+ this.autoCreateTableWhenSnapshot = autoCreateTableWhenSnapshot;
+ return this;
+ }
+
public Builder schemaChangePolicies(String schemaChangePolicies) {
this.schemaChangePolicies = schemaChangePolicies;
return this;
@@ -699,7 +705,7 @@ public class FlinkSink {
int parallelism = writeParallelism == null ?
input.getParallelism() : writeParallelism;
DynamicSchemaHandleOperator routeOperator = new
DynamicSchemaHandleOperator(
catalogLoader, multipleSinkOption, dirtyOptions,
dirtySink, inlongMetric, auditHostAndPorts,
- enableSchemaChange, schemaChangePolicies);
+ enableSchemaChange, schemaChangePolicies,
autoCreateTableWhenSnapshot);
SingleOutputStreamOperator<RecordWithSchema> routeStream = input
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
TypeInformation.of(RecordWithSchema.class),
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 2591c4fd59..238cf7e6ff 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
-import org.apache.inlong.sort.base.schema.SchemaChangeHelper;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.util.MetricStateUtils;
@@ -65,7 +64,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import javax.ws.rs.NotSupportedException;
import java.io.Closeable;
import java.io.IOException;
@@ -123,10 +121,11 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
private @Nullable transient SinkTableMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
- private SchemaChangeHelper schemaChangeHelper;
+ private IcebergSchemaChangeHelper schemaChangeHelper;
private String schemaChangePolicies;
private boolean enableSchemaChange;
private final String INCREMENTAL = "incremental";
+ private boolean autoCreateTableWhenSnapshot;
public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
MultipleSinkOption multipleSinkOption,
@@ -135,7 +134,8 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
String inlongMetric,
String auditHostAndPorts,
boolean enableSchemaChange,
- String schemaChangePolicies) {
+ String schemaChangePolicies,
+ boolean autoCreateTableWhenSnapshot) {
this.catalogLoader = catalogLoader;
this.multipleSinkOption = multipleSinkOption;
this.inlongMetric = inlongMetric;
@@ -143,6 +143,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
this.schemaChangePolicies = schemaChangePolicies;
this.enableSchemaChange = enableSchemaChange;
+ this.autoCreateTableWhenSnapshot = autoCreateTableWhenSnapshot;
}
@SuppressWarnings("unchecked")
@@ -244,8 +245,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
}
for (RowData rowData : rowDataForDataSchemaList) {
- DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
- if (!dirtyOptions.ignoreDirty()) {
+ if (dirtySinkHelper.getDirtySink() == null) {
if (metricData != null) {
metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(),
tableId.name(), rowData);
@@ -314,7 +314,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
if (this.inlongMetric != null) {
this.metricStateListState =
context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor<>(
- String.format(INLONG_METRIC_STATE_NAME),
TypeInformation.of(new TypeHint<MetricState>() {
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new
TypeHint<MetricState>() {
})));
}
if (context.isRestored()) {
@@ -325,6 +325,12 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
private void execDDL(byte[] originData, JsonNode jsonNode) {
schemaChangeHelper.process(originData, jsonNode);
+ if (schemaChangeHelper.ddlExecSuccess().get()) {
+ RecordWithSchema record = new RecordWithSchema();
+ record.setDDL(true);
+ output.collect(new StreamRecord<>(record));
+ schemaChangeHelper.ddlExecSuccess().set(false);
+ }
}
private void execDML(JsonNode jsonNode, TableIdentifier tableId) {
@@ -343,7 +349,9 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
});
if (schema == null) {
try {
- handleTableCreateEventFromOperator(record.getTableId(),
dataSchema);
+ boolean incremental =
Optional.ofNullable(jsonNode.get(INCREMENTAL))
+ .map(JsonNode::asBoolean).orElse(false);
+ handleTableCreateEventFromOperator(record.getTableId(),
dataSchema, incremental);
} catch (Exception e) {
LOGGER.error("Table create error, tableId: {}, schema: {}",
record.getTableId(), dataSchema);
if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
multipleSinkOption
@@ -382,6 +390,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
try {
return
dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1));
} catch (Exception e) {
+ LOG.error(String.format("Table %s extract
RowData failed!", tableId), e);
if
(SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption
.getSchemaUpdatePolicy()) {
isDirty.set(true);
@@ -390,7 +399,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
rowDataForDataSchemaList =
dynamicSchemaFormat
.extractRowData(jsonNode,
FlinkSchemaUtil.convert(dataSchema));
} catch (Throwable ee) {
- LOG.error("extractRowData {} failed!",
jsonNode, ee);
+ LOG.error("extract RowData {}
failed!", jsonNode, ee);
}
for (RowData rowData :
rowDataForDataSchemaList) {
@@ -430,11 +439,12 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
.map(JsonNode::asBoolean).orElse(false));
output.collect(new StreamRecord<>(recordWithSchema));
} else {
+ LOG.warn("Table {} schema is different!", tableId);
if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
multipleSinkOption
.getSchemaUpdatePolicy()) {
RecordWithSchema recordWithSchema = queue.poll();
handleDirtyDataOfLogWithIgnore(recordWithSchema.getOriginalData(), dataSchema,
tableId,
- new NotSupportedException(
+ new RuntimeException(
String.format("SchemaUpdatePolicy %s does
not support schema dynamic update!",
multipleSinkOption.getSchemaUpdatePolicy())));
} else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL ==
multipleSinkOption
@@ -446,7 +456,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
handldAlterSchemaEventFromOperator(tableId, latestSchema,
dataSchema);
break;
} else {
- throw new NotSupportedException(
+ throw new RuntimeException(
String.format("SchemaUpdatePolicy %s does not
support schema dynamic update!",
multipleSinkOption.getSchemaUpdatePolicy()));
}
@@ -455,8 +465,10 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
}
// ================================ All coordinator handle method
==============================================
- private void handleTableCreateEventFromOperator(TableIdentifier tableId,
Schema schema) {
- IcebergSchemaChangeUtils.createTable(catalog, tableId,
asNamespaceCatalog, schema);
+ private void handleTableCreateEventFromOperator(TableIdentifier tableId,
Schema schema, boolean incremental) {
+ if (this.autoCreateTableWhenSnapshot && !incremental) {
+ IcebergSchemaChangeUtils.createTable(catalog, tableId,
asNamespaceCatalog, schema);
+ }
handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 88a2e27474..b581772b5b 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -195,7 +195,13 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
@Override
public void processElement(RecordWithSchema recordWithSchema) throws
Exception {
TableIdentifier tableId = recordWithSchema.getTableId();
-
+ if (recordWithSchema.isDDL()) {
+ // just record node metrics for ddl
+ if (sinkMetricData != null) {
+ sinkMetricData.outputMetricsWithEstimate(1);
+ }
+ return;
+ }
if (isSchemaUpdate(recordWithSchema)) {
if (multipleTables.get(tableId) == null) {
Table table = catalog.loadTable(recordWithSchema.getTableId());
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
index 31bfd732eb..cd0626bd75 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
@@ -74,23 +74,33 @@ public class IcebergSchemaChangeUtils extends
SchemaChangeUtils {
}
public static void applySchemaChanges(UpdateSchema pendingUpdate,
List<TableChange> tableChanges) {
- for (TableChange change : tableChanges) {
- if (change instanceof TableChange.AddColumn) {
- applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
- } else if (change instanceof TableChange.DeleteColumn) {
- applyDeleteColumn(pendingUpdate, (TableChange.DeleteColumn)
change);
- } else if (change instanceof TableChange.UpdateColumn) {
- applyUpdateColumn(pendingUpdate, (TableChange.UpdateColumn)
change);
+ try {
+ for (TableChange change : tableChanges) {
+ if (change instanceof TableChange.AddColumn) {
+ applyAddColumn(pendingUpdate, (TableChange.AddColumn)
change);
+ } else if (change instanceof TableChange.DeleteColumn) {
+ applyDeleteColumn(pendingUpdate,
(TableChange.DeleteColumn) change);
+ } else if (change instanceof TableChange.UpdateColumn) {
+ applyUpdateColumn(pendingUpdate,
(TableChange.UpdateColumn) change);
+ } else {
+ throw new UnsupportedOperationException("Cannot apply
unknown table change: " + change);
+ }
+ }
+ pendingUpdate.commit();
+ } catch (Exception e) {
+ String addColumnDuplicationExecException = "Cannot add column,
name already exists";
+ String deleteColumnDuplicationExecException = "Cannot delete
missing column";
+ if (e.getMessage().contains(addColumnDuplicationExecException) ||
+
e.getMessage().contains(deleteColumnDuplicationExecException)) {
+ // try catch exception for replay ddl binlog
+ LOGGER.warn("ddl exec exception", e);
} else {
- throw new UnsupportedOperationException("Cannot apply unknown
table change: " + change);
+ throw e;
}
}
- pendingUpdate.commit();
}
public static void applyAddColumn(UpdateSchema pendingUpdate,
TableChange.AddColumn add) {
- Preconditions.checkArgument(add.isNullable(),
- "Incompatible change: cannot add required column: %s",
leafName(add.fieldNames()));
Type type = add.dataType().accept(new
FlinkTypeToType(RowType.of(add.dataType())));
pendingUpdate.addColumn(parentName(add.fieldNames()),
leafName(add.fieldNames()), type, add.comment());
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
index c929ab3742..7e44db2594 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.iceberg.sink.multiple;
import lombok.Data;
+import lombok.NoArgsConstructor;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
@@ -45,6 +46,7 @@ import java.util.function.BiFunction;
import java.util.stream.Collectors;
@Data
+@NoArgsConstructor
public class RecordWithSchema {
public RecordWithSchema(
@@ -61,6 +63,7 @@ public class RecordWithSchema {
private boolean isDirty;
private long rowCount;
private long rowSize;
+ private boolean isDDL = false;
private boolean incremental;
private transient JsonNode originalData;
@@ -69,9 +72,9 @@ public class RecordWithSchema {
private Schema schema;
- private final TableIdentifier tableId;
+ private TableIdentifier tableId;
- private final List<String> primaryKeys;
+ private List<String> primaryKeys;
public List<RowData> getData() {
return data;