This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7d2461d29a [INLONG-8140][Sort] Support data inference schema change
type and iceberg ddl change (#8141)
7d2461d29a is described below
commit 7d2461d29aa2ede19cef01f0adfaaae3bfa8d48a
Author: LinChen <[email protected]>
AuthorDate: Mon Jun 5 16:24:44 2023 +0800
[INLONG-8140][Sort] Support data inference schema change type and iceberg
ddl change (#8141)
---
.../apache/inlong/sort/schema/ColumnSchema.java | 34 +++
.../apache/inlong/sort/schema}/TableChange.java | 2 +-
.../apache/inlong/sort/util/SchemaChangeUtils.java | 102 +++++++-
.../sort/base/schema/SchemaChangeHandle.java | 82 +++++++
.../sort/base/schema/SchemaChangeHelper.java | 271 +++++++++++++++++++++
.../inlong/sort/base/sink/MultipleSinkOption.java | 2 +
.../sort-connectors/iceberg/pom.xml | 22 ++
.../inlong/sort/iceberg/IcebergTableSink.java | 6 +
.../iceberg/schema/IcebergSchemaChangeHelper.java | 143 +++++++++++
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 15 +-
.../sink/multiple/DynamicSchemaHandleOperator.java | 88 ++++---
.../sink/multiple/IcebergSchemaChangeUtils.java | 140 +++++++++++
.../iceberg/sink/multiple/SchemaChangeUtils.java | 198 ---------------
...tils.java => TestIcebergSchemaChangeUtils.java} | 22 +-
14 files changed, 880 insertions(+), 247 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/ColumnSchema.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/ColumnSchema.java
new file mode 100644
index 0000000000..94ecf33c80
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/ColumnSchema.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.schema;
+
+import lombok.Data;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Schema information contained in a column.
+ * */
+@Data
+public class ColumnSchema {
+
+ private String name;
+ private LogicalType type;
+ private boolean isNullable;
+ private String comment;
+ private TableChange.ColumnPosition position;
+}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/TableChange.java
similarity index 99%
rename from
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
rename to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/TableChange.java
index 35219763bf..24c1777fb8 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/schema/TableChange.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.base.sink;
+package org.apache.inlong.sort.schema;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
index 60970d0f8a..c6ffac7da0 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
@@ -23,25 +23,31 @@ import
org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
import org.apache.inlong.sort.protocol.ddl.operations.Operation;
import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.schema.ColumnSchema;
+import org.apache.inlong.sort.schema.TableChange;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringJoiner;
+import java.util.stream.Collectors;
/**
* Schema-change Utils
*/
-public final class SchemaChangeUtils {
+public class SchemaChangeUtils {
private final static String DELIMITER = "&";
private final static String KEY_VALUE_DELIMITER = "=";
- private SchemaChangeUtils() {
+ public SchemaChangeUtils() {
}
/**
@@ -226,4 +232,96 @@ public final class SchemaChangeUtils {
}
return !column.getName().trim().isEmpty();
}
+
+ /**
+ * Compare two schemas and get the schema changes that happened in them.
+ * TODO: currently only support add column,delete column and column type
change, rename column and column position change are not supported
+ *
+ * @param oldColumnSchemas
+ * @param newColumnSchemas
+ * @return
+ */
+ public static List<TableChange> diffSchema(Map<String, ColumnSchema>
oldColumnSchemas,
+ Map<String, ColumnSchema> newColumnSchemas) {
+ List<String> oldFields = oldColumnSchemas.values().stream()
+ .map(ColumnSchema::getName).collect(Collectors.toList());
+ List<String> newFields = newColumnSchemas.values().stream()
+ .map(ColumnSchema::getName).collect(Collectors.toList());
+ Set<String> oldFieldSet = new HashSet<>(oldFields);
+ Set<String> newFieldSet = new HashSet<>(newFields);
+
+ Set<String> intersectColSet = Sets.intersection(oldFieldSet,
newFieldSet);
+ Set<String> colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+ Set<String> colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
+ List<TableChange> tableChanges = new ArrayList<>();
+
+ // step0: judge whether unknown change
+ // 1.just diff two different schema can not distinguish(add + delete)
vs modify
+ // Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d],
currently it is only judged as unknown
+ // change.
+ // In next version,we will judge it is [delete and add] or rename by
using information extracted from DDL
+ if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
+ tableChanges.add(new TableChange.UnknownColumnChange(
+ String.format(" Old ColumnSchema: [%s] and new
ColumnSchema: [%s], it is unknown column change",
+ oldColumnSchemas, newColumnSchemas)));
+ return tableChanges;
+ }
+
+ // 2.if some filed positions in new schema are not same with old
schema, there is no way to deal with it.
+ // This situation only is regarded as unknown column change
+ if (colsToDelete.isEmpty() && colsToAdd.isEmpty() &&
oldFieldSet.equals(newFieldSet)
+ && !oldFields.equals(newFields)) {
+ tableChanges.add(
+ new TableChange.UnknownColumnChange(
+ String.format(
+ " Old ColumnSchema: [%s] and new
ColumnSchema: [%s], "
+ + " they are same but some filed
positions are not same."
+ + " This situation only is
regarded as unknown column change at present",
+ oldColumnSchemas, newColumnSchemas)));
+ return tableChanges;
+ }
+
+ // step1: judge whether column type change
+ for (String colName : intersectColSet) {
+ ColumnSchema oldCol = oldColumnSchemas.get(colName);
+ ColumnSchema newCol = newColumnSchemas.get(colName);
+ if (!oldCol.getType().equals(newCol.getType())
+ || !oldCol.getComment().equals(newCol.getComment())) {
+ tableChanges.add(
+ new TableChange.UpdateColumn(
+ new String[]{newCol.getName()},
+ newCol.getType(),
+ newCol.isNullable(),
+ newCol.getComment()));
+ }
+ }
+
+ // step2: judge whether delete column
+ for (String colName : oldFields) {
+ if (colsToDelete.contains(colName)) {
+ tableChanges.add(
+ new TableChange.DeleteColumn(
+ new String[]{colName}));
+ }
+ }
+
+ // step3: judge whether add column
+ if (!colsToAdd.isEmpty()) {
+ for (int i = 0; i < newFields.size(); i++) {
+ String colName = newFields.get(i);
+ if (colsToAdd.contains(colName)) {
+ ColumnSchema addCol = newColumnSchemas.get(colName);
+ tableChanges.add(
+ new TableChange.AddColumn(
+ new String[]{addCol.getName()},
+ addCol.getType(),
+ addCol.isNullable(),
+ addCol.getComment(),
+ addCol.getPosition()));
+ }
+ }
+ }
+ return tableChanges;
+ }
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandle.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandle.java
new file mode 100644
index 0000000000..cb65f3af93
--- /dev/null
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandle.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.schema;
+
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Schema change helper interface
+ * */
+public interface SchemaChangeHandle {
+
+ /**
+ * execute ddl statement
+ * */
+ void process(byte[] originData, JsonNode data);
+ /**
+ * Handle modify column operations
+ * */
+ void handleAlterOperation(String database, String table, byte[]
originData, String originSchema,
+ JsonNode data, AlterOperation operation);
+ /**
+ * apply the modify column operation
+ * */
+ void doAlterOperation(String database, String table, byte[] originData,
String originSchema, JsonNode data,
+ Map<SchemaChangeType, List<AlterColumn>> typeMap);
+ /**
+ * apply the add column operation
+ * */
+ void doAddColumn(SchemaChangeType type, String originSchema);
+ /**
+ * apply the operation of changing the column type
+ * */
+ void doChangeColumnType(SchemaChangeType type, String originSchema);
+ /**
+ * apply the change column name operation
+ * */
+ void doRenameColumn(SchemaChangeType type, String originSchema);
+ /**
+ * apply the delete column operation
+ * */
+ void doDropColumn(SchemaChangeType type, String originSchema);
+ /**
+ * handle the create table operation
+ * */
+ void doCreateTable(byte[] originData, String database, String table,
SchemaChangeType type,
+ String originSchema, JsonNode data, CreateTableOperation
operation);
+ /**
+ * handle drop table operation
+ * */
+ void doDropTable(SchemaChangeType type, String originSchema);
+ /**
+ * handle rename table operation
+ * */
+ void doRenameTable(SchemaChangeType type, String originSchema);
+ /**
+ * handle truncate table operation
+ * */
+ void doTruncateTable(SchemaChangeType type, String originSchema);
+}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
new file mode 100644
index 0000000000..07ef0f1456
--- /dev/null
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.schema;
+
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Schema change helper
+ * */
+public abstract class SchemaChangeHelper implements SchemaChangeHandle {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaChangeHelper.class);
+ private final boolean schemaChange;
+ protected final Map<SchemaChangeType, SchemaChangePolicy> policyMap;
+ protected final JsonDynamicSchemaFormat dynamicSchemaFormat;
+ private final String databasePattern;
+ private final String tablePattern;
+ protected final SchemaUpdateExceptionPolicy exceptionPolicy;
+ private final SinkTableMetricData metricData;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
+
+ public SchemaChangeHelper(JsonDynamicSchemaFormat dynamicSchemaFormat,
boolean schemaChange,
+ Map<SchemaChangeType, SchemaChangePolicy> policyMap, String
databasePattern, String tablePattern,
+ SchemaUpdateExceptionPolicy exceptionPolicy,
+ SinkTableMetricData metricData, DirtySinkHelper<Object>
dirtySinkHelper) {
+ this.dynamicSchemaFormat =
Preconditions.checkNotNull(dynamicSchemaFormat, "dynamicSchemaFormat is null");
+ this.schemaChange = schemaChange;
+ this.policyMap = policyMap;
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.exceptionPolicy = exceptionPolicy;
+ this.metricData = metricData;
+ this.dirtySinkHelper = dirtySinkHelper;
+ }
+
+ public void process(byte[] originData, JsonNode data) {
+ // 1. Extract the schema change type from the data;
+ if (!schemaChange) {
+ return;
+ }
+ String database;
+ String table;
+ try {
+ database = dynamicSchemaFormat.parse(data, databasePattern);
+ table = dynamicSchemaFormat.parse(data, tablePattern);
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Parse database, table from origin data
failed, origin data: %s",
+ new String(originData)),
+ e);
+ }
+ LOGGER.warn("Parse database, table from origin data failed, origin
data: {}", new String(originData), e);
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+ dirtySinkHelper.invoke(new String(originData),
DirtyType.JSON_PROCESS_ERROR, e);
+ }
+ if (metricData != null) {
+ metricData.invokeDirty(1, originData.length);
+ }
+ return;
+ }
+ Operation operation;
+ try {
+ JsonNode operationNode =
Preconditions.checkNotNull(data.get("operation"),
+ "Operation node is null");
+ operation = Preconditions.checkNotNull(
+
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new
TypeReference<Operation>() {
+ }), "Operation is null");
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Extract Operation from origin data
failed,origin data: %s", data), e);
+ }
+ LOGGER.warn("Extract Operation from origin data failed,origin
data: {}", data, e);
+ handleDirtyData(data, originData, database, table,
DirtyType.JSON_PROCESS_ERROR, e);
+ return;
+ }
+ String originSchema = dynamicSchemaFormat.extractDDL(data);
+ SchemaChangeType type =
SchemaChangeUtils.extractSchemaChangeType(operation);
+ if (type == null) {
+ LOGGER.warn("Unsupported for schema-change: {}", originSchema);
+ return;
+ }
+
+ // 2. Apply schema change;
+ SchemaChangePolicy policy = policyMap.get(type);
+ if (policy != SchemaChangePolicy.ENABLE) {
+ doSchemaChangeBase(type, policy, originSchema);
+ } else {
+ switch (type) {
+ case ALTER:
+ handleAlterOperation(database, table, originData,
originSchema, data, (AlterOperation) operation);
+ break;
+ case CREATE_TABLE:
+ doCreateTable(originData, database, table, type,
originSchema, data,
+ (CreateTableOperation) operation);
+ break;
+ case DROP_TABLE:
+ doDropTable(type, originSchema);
+ break;
+ case RENAME_TABLE:
+ doRenameTable(type, originSchema);
+ break;
+ case TRUNCATE_TABLE:
+ doTruncateTable(type, originSchema);
+ break;
+ default:
+ LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+ }
+ }
+ }
+
+ @Override
+ public void handleAlterOperation(String database, String table, byte[]
originData,
+ String originSchema, JsonNode data, AlterOperation operation) {
+ if (operation.getAlterColumns() == null ||
operation.getAlterColumns().isEmpty()) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Alter columns is empty, origin schema:
%s", originSchema));
+ }
+ LOGGER.warn("Alter columns is empty, origin schema: {}",
originSchema);
+ return;
+ }
+
+ Map<SchemaChangeType, List<AlterColumn>> typeMap = new
LinkedHashMap<>();
+ for (AlterColumn alterColumn : operation.getAlterColumns()) {
+ Set<SchemaChangeType> types = null;
+ try {
+ types = SchemaChangeUtils.extractSchemaChangeType(alterColumn);
+ Preconditions.checkState(!types.isEmpty(), "Schema change
types is empty");
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Extract schema change type failed,
origin schema: %s", originSchema), e);
+ }
+ LOGGER.warn("Extract schema change type failed, origin schema:
{}", originSchema, e);
+ }
+ if (types == null) {
+ continue;
+ }
+ if (types.size() == 1) {
+ SchemaChangeType type = types.stream().findFirst().get();
+ typeMap.computeIfAbsent(type, k -> new
ArrayList<>()).add(alterColumn);
+ } else {
+ // Handle change column, it only exists change column type and
rename column in this scenario for now.
+ for (SchemaChangeType type : types) {
+ LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+ }
+ }
+ }
+
+ if (!typeMap.isEmpty()) {
+ doAlterOperation(database, table, originData, originSchema, data,
typeMap);
+ }
+ }
+
+ @Override
+ public void doAddColumn(SchemaChangeType type, String originSchema) {
+ throw new SchemaChangeHandleException(String.format("Unsupported for
%s: %s", type, originSchema));
+ }
+
+ @Override
+ public void doChangeColumnType(SchemaChangeType type, String originSchema)
{
+ throw new SchemaChangeHandleException(String.format("Unsupported for
%s: %s", type, originSchema));
+ }
+
+ @Override
+ public void doRenameColumn(SchemaChangeType type, String originSchema) {
+ throw new SchemaChangeHandleException(String.format("Unsupported for
%s: %s", type, originSchema));
+ }
+
+ @Override
+ public void doDropColumn(SchemaChangeType type, String originSchema) {
+ throw new SchemaChangeHandleException(String.format("Unsupported for
%s: %s", type, originSchema));
+ }
+
+ @Override
+ public void doCreateTable(byte[] originData, String database, String
table, SchemaChangeType type,
+ String originSchema, JsonNode data, CreateTableOperation
operation) {
+ throw new SchemaChangeHandleException(String.format("Unsupported for
%s: %s", type, originSchema));
+ }
+
+ @Override
+ public void doDropTable(SchemaChangeType type, String originSchema) {
+ throw new SchemaChangeHandleException(String.format("Unsupported for
%s: %s", type, originSchema));
+ }
+
+ @Override
+ public void doRenameTable(SchemaChangeType type, String originSchema) {
+ throw new SchemaChangeHandleException(String.format("Unsupported for
%s: %s", type, originSchema));
+ }
+
+ @Override
+ public void doTruncateTable(SchemaChangeType type, String originSchema) {
+ throw new SchemaChangeHandleException(String.format("Unsupported for
%s: %s", type, originSchema));
+ }
+
+ protected void handleDirtyData(JsonNode data, byte[] originData, String
database,
+ String table, DirtyType dirtyType, Throwable e) {
+ if (exceptionPolicy == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+ String label = parseValue(data,
dirtySinkHelper.getDirtyOptions().getLabels());
+ String logTag = parseValue(data,
dirtySinkHelper.getDirtyOptions().getLogTag());
+ String identifier = parseValue(data,
dirtySinkHelper.getDirtyOptions().getIdentifier());
+ dirtySinkHelper.invoke(new String(originData), dirtyType, label,
logTag, identifier, e);
+ }
+ if (metricData != null) {
+ metricData.outputDirtyMetricsWithEstimate(database, table, 1,
originData.length);
+ }
+ }
+ private String parseValue(JsonNode data, String pattern) {
+ try {
+ return dynamicSchemaFormat.parse(data, pattern);
+ } catch (Exception e) {
+ LOGGER.warn("Parse value from pattern failed,the pattern: {},
data: {}", pattern, data);
+ }
+ return pattern;
+ }
+
+ protected void doSchemaChangeBase(SchemaChangeType type,
SchemaChangePolicy policy, String schema) {
+ if (policy == null) {
+ LOGGER.warn("Unsupported for {}: {}", type, schema);
+ return;
+ }
+ switch (policy) {
+ case LOG:
+ LOGGER.warn("Unsupported for {}: {}", type, schema);
+ break;
+ case ERROR:
+ throw new
SchemaChangeHandleException(String.format("Unsupported for %s: %s", type,
schema));
+ default:
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 211acc0cee..d9086c3cab 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.base.sink;
+import org.apache.inlong.sort.schema.TableChange;
+
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/pom.xml
index b725b4f020..ef60618492 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/pom.xml
@@ -91,6 +91,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
@@ -139,6 +145,22 @@
<pattern>com.amazonaws</pattern>
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.com.amazonaws</shadedPattern>
</relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.configuration</pattern>
+
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.configuration</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.protocol</pattern>
+
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.protocol</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.schema</pattern>
+
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.schema</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.util</pattern>
+
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.util</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 1e2e28e40b..22e8d39ba4 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -57,6 +57,8 @@ import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERA
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
+import static org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_ENABLE;
+import static
org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_POLICIES;
import static
org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_DISTRIBUTION_MODE;
import static
org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_PARALLELISM;
@@ -121,6 +123,8 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
final ReadableConfig tableOptions =
Configuration.fromMap(catalogTable.getOptions());
boolean multipleSink = tableOptions.get(SINK_MULTIPLE_ENABLE);
+ boolean schemaChange = tableOptions.get(SINK_SCHEMA_CHANGE_ENABLE);
+ String schemaChangePolicies =
tableOptions.getOptional(SINK_SCHEMA_CHANGE_POLICIES).orElse(null);
if (multipleSink) {
return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
.overwrite(overwrite)
@@ -142,6 +146,8 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
.action(actionsProvider)
.tableOptions(tableOptions)
.distributionMode(DistributionMode.fromName(tableOptions.get(WRITE_DISTRIBUTION_MODE)))
+ .enableSchemaChange(schemaChange)
+ .schemaChangePolicies(schemaChangePolicies)
.append();
} else {
return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
new file mode 100644
index 0000000000..b3abc456c3
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/schema/IcebergSchemaChangeHelper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.schema;
+
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.schema.SchemaChangeHandleException;
+import org.apache.inlong.sort.base.schema.SchemaChangeHelper;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSchemaChangeUtils;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.schema.TableChange;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Iceberg schema change helper
+ * */
+public class IcebergSchemaChangeHelper extends SchemaChangeHelper {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergSchemaChangeHelper.class);
+ private transient Catalog catalog;
+
+ private transient SupportsNamespaces asNamespaceCatalog;
+
+ public IcebergSchemaChangeHelper(JsonDynamicSchemaFormat
dynamicSchemaFormat, boolean schemaChange,
+ Map<SchemaChangeType, SchemaChangePolicy> policyMap, String
databasePattern, String tablePattern,
+ SchemaUpdateExceptionPolicy exceptionPolicy,
+ SinkTableMetricData metricData, DirtySinkHelper<Object>
dirtySinkHelper,
+ Catalog catalog,
+ SupportsNamespaces asNamespaceCatalog) {
+ super(dynamicSchemaFormat, schemaChange, policyMap, databasePattern,
+ tablePattern, exceptionPolicy, metricData, dirtySinkHelper);
+ this.catalog = catalog;
+ this.asNamespaceCatalog = asNamespaceCatalog;
+ }
+ @Override
+ public void doAlterOperation(String database, String table, byte[]
originData, String originSchema, JsonNode data,
+ Map<SchemaChangeType, List<AlterColumn>> typeMap) {
+ for (Map.Entry<SchemaChangeType, List<AlterColumn>> kv :
typeMap.entrySet()) {
+ try {
+ switch (kv.getKey()) {
+ case ADD_COLUMN:
+ doAddColumn(kv.getValue(),
TableIdentifier.of(database, table));
+ break;
+ case DROP_COLUMN:
+ doDropColumn(kv.getKey(), originSchema);
+ break;
+ case RENAME_COLUMN:
+ doRenameColumn(kv.getKey(), originSchema);
+ break;
+ case CHANGE_COLUMN_TYPE:
+ doChangeColumnType(kv.getKey(), originSchema);
+ break;
+ default:
+ }
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Apply alter column failed, origin
schema: %s", originSchema), e);
+ }
+ LOGGER.warn("Apply alter column failed, origin schema: {}",
originSchema, e);
+ handleDirtyData(data, originData, database, table,
DirtyType.HANDLE_ALTER_TABLE_ERROR, e);
+ }
+ }
+ }
+
+ @Override
+ public void doCreateTable(byte[] originData, String database, String
table, SchemaChangeType type,
+ String originSchema, JsonNode data, CreateTableOperation
operation) {
+ try {
+ TableIdentifier tableId = TableIdentifier.of(database, table);
+ List<String> pkListStr =
dynamicSchemaFormat.extractPrimaryKeyNames(data);
+ RowType rowType = dynamicSchemaFormat.extractSchema(data,
pkListStr);
+ Schema schema =
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(rowType));
+ IcebergSchemaChangeUtils.createTable(catalog, tableId,
asNamespaceCatalog, schema);
+ return;
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Drop column failed, origin schema: %s",
originSchema), e);
+ }
+ handleDirtyData(data, originData, database, table,
DirtyType.CREATE_TABLE_ERROR, e);
+ return;
+ }
+ }
+
+ public void doAddColumn(List<AlterColumn> alterColumns, TableIdentifier
tableId) {
+ List<TableChange> tableChanges = new ArrayList<>();
+ Table table = catalog.loadTable(tableId);
+ Transaction transaction = table.newTransaction();
+
+ alterColumns.forEach(alterColumn -> {
+ Column column = alterColumn.getNewColumn();
+ LogicalType dataType =
dynamicSchemaFormat.sqlType2FlinkType(column.getJdbcType());
+ TableChange.ColumnPosition position =
+ column.getPosition().getPositionType() ==
PositionType.FIRST ? TableChange.ColumnPosition.first()
+ :
TableChange.ColumnPosition.after(column.getName());
+ TableChange.AddColumn addColumn = new TableChange.AddColumn(new
String[]{column.getName()},
+ dataType, column.isNullable(), column.getComment(),
position);
+ tableChanges.add(addColumn);
+ });
+
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(),
tableChanges);
+ transaction.commitTransaction();
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 5cb0993b04..f36de37786 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -182,6 +182,8 @@ public class FlinkSink {
private DirtyOptions dirtyOptions;
private @Nullable DirtySink<Object> dirtySink;
private ReadableConfig tableOptions = new Configuration();
+ private boolean enableSchemaChange;
+ private String schemaChangePolicies;
private Builder() {
}
@@ -266,6 +268,16 @@ public class FlinkSink {
return this;
}
+ public Builder enableSchemaChange(boolean schemaChange) {
+ this.enableSchemaChange = schemaChange;
+ return this;
+ }
+
+ public Builder schemaChangePolicies(String schemaChangePolicies) {
+ this.schemaChangePolicies = schemaChangePolicies;
+ return this;
+ }
+
/**
* The appendMode properties is used to insert data without equality
field columns.
*
@@ -672,7 +684,8 @@ public class FlinkSink {
int parallelism = writeParallelism == null ?
input.getParallelism() : writeParallelism;
DynamicSchemaHandleOperator routeOperator = new
DynamicSchemaHandleOperator(
- catalogLoader, multipleSinkOption, dirtyOptions,
dirtySink, inlongMetric, auditHostAndPorts);
+ catalogLoader, multipleSinkOption, dirtyOptions,
dirtySink, inlongMetric, auditHostAndPorts,
+ enableSchemaChange, schemaChangePolicies);
SingleOutputStreamOperator<RecordWithSchema> routeStream = input
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
TypeInformation.of(RecordWithSchema.class),
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index a21e4307d6..44541d132f 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -21,17 +21,22 @@ import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
-import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.schema.SchemaChangeHelper;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
-import org.apache.inlong.sort.base.sink.TableChange;
import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.iceberg.schema.IcebergSchemaChangeHelper;
+import org.apache.inlong.sort.schema.ColumnSchema;
+import org.apache.inlong.sort.schema.TableChange;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
@@ -47,17 +52,15 @@ import
org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +73,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -98,7 +102,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
private transient Catalog catalog;
private transient SupportsNamespaces asNamespaceCatalog;
- private transient AbstractDynamicSchemaFormat<JsonNode>
dynamicSchemaFormat;
+ private transient JsonDynamicSchemaFormat dynamicSchemaFormat;
private transient ProcessingTimeService processingTimeService;
// record cache, wait schema to consume record
@@ -118,18 +122,25 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
private @Nullable transient SinkTableMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
+ private SchemaChangeHelper schemaChangeHelper;
+ private String schemaChangePolicies;
+ private boolean enableSchemaChange;
public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
MultipleSinkOption multipleSinkOption,
DirtyOptions dirtyOptions,
@Nullable DirtySink<Object> dirtySink,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ boolean enableSchemaChange,
+ String schemaChangePolicies) {
this.catalogLoader = catalogLoader;
this.multipleSinkOption = multipleSinkOption;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+ this.schemaChangePolicies = schemaChangePolicies;
+ this.enableSchemaChange = enableSchemaChange;
}
@SuppressWarnings("unchecked")
@@ -140,7 +151,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
this.asNamespaceCatalog =
catalog instanceof SupportsNamespaces ? (SupportsNamespaces)
catalog : null;
- this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(
+ this.dynamicSchemaFormat = (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(
multipleSinkOption.getFormat(),
multipleSinkOption.getFormatOption());
this.processingTimeService =
getRuntimeContext().getProcessingTimeService();
@@ -165,6 +176,11 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
metricData = new SinkTableMetricData(metricOption,
getRuntimeContext().getMetricGroup());
}
this.dirtySinkHelper.open(new Configuration());
+ this.schemaChangeHelper = new
IcebergSchemaChangeHelper(dynamicSchemaFormat,
+ enableSchemaChange, enableSchemaChange ?
SchemaChangeUtils.deserialize(schemaChangePolicies) : null,
+ multipleSinkOption.getDatabasePattern(),
multipleSinkOption.getTablePattern(),
+ multipleSinkOption.getSchemaUpdatePolicy(), metricData,
dirtySinkHelper,
+ catalog, asNamespaceCatalog);
}
@Override
@@ -208,7 +224,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
}
boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
if (isDDL) {
- execDDL(jsonNode, tableId);
+ execDDL(element.getValue().getBinary(0), jsonNode);
} else {
execDML(jsonNode, tableId);
}
@@ -304,8 +320,8 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
}
}
- private void execDDL(JsonNode jsonNode, TableIdentifier tableId) {
- // todo:parse ddl sql
+ private void execDDL(byte[] originData, JsonNode jsonNode) {
+ schemaChangeHelper.process(originData, jsonNode);
}
private void execDML(JsonNode jsonNode, TableIdentifier tableId) {
@@ -434,33 +450,29 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
// ================================ All coordinator handle method
==============================================
private void handleTableCreateEventFromOperator(TableIdentifier tableId,
Schema schema) {
- if (!catalog.tableExists(tableId)) {
- if (asNamespaceCatalog != null &&
!asNamespaceCatalog.namespaceExists(tableId.namespace())) {
- try {
- asNamespaceCatalog.createNamespace(tableId.namespace());
- LOG.info("Auto create Database({}) in Catalog({}).",
tableId.namespace(), catalog.name());
- } catch (AlreadyExistsException e) {
- LOG.warn("Database({}) already exist in Catalog({})!",
tableId.namespace(), catalog.name());
- }
- }
- ImmutableMap.Builder<String, String> properties =
ImmutableMap.builder();
- properties.put("format-version", "2");
- properties.put("write.upsert.enabled", "true");
- properties.put("write.metadata.metrics.default", "full");
- // for hive visible
- properties.put("engine.hive.enabled", "true");
- try {
- catalog.createTable(tableId, schema,
PartitionSpec.unpartitioned(), properties.build());
- LOG.info("Auto create Table({}) in Database({}) in
Catalog({})!",
- tableId.name(), tableId.namespace(), catalog.name());
- } catch (AlreadyExistsException e) {
- LOG.warn("Table({}) already exist in Database({}) in
Catalog({})!",
- tableId.name(), tableId.namespace(), catalog.name());
- }
- }
+ IcebergSchemaChangeUtils.createTable(catalog, tableId,
asNamespaceCatalog, schema);
handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
}
+ @VisibleForTesting
+ public static Map<String, ColumnSchema> extractColumnSchema(Schema schema)
{
+ Map<String, ColumnSchema> columnSchemaMap = new LinkedHashMap<>();
+ List<Types.NestedField> nestedFieldList = schema.columns();
+ int n = nestedFieldList.size();
+ for (int i = 0; i < n; i++) {
+ Types.NestedField nestedField = nestedFieldList.get(i);
+ ColumnSchema columnSchema = new ColumnSchema();
+ columnSchema.setName(nestedField.name());
+ columnSchema.setType(FlinkSchemaUtil.convert(nestedField.type()));
+ columnSchema.setNullable(nestedField.isOptional());
+ columnSchema.setComment(nestedField.doc());
+ columnSchema.setPosition(i == 0 ?
TableChange.ColumnPosition.first()
+ : TableChange.ColumnPosition.after(nestedFieldList.get(i -
1).name()));
+ columnSchemaMap.put(nestedField.name(), columnSchema);
+ }
+ return columnSchemaMap;
+ }
+
private void handldAlterSchemaEventFromOperator(TableIdentifier tableId,
Schema oldSchema, Schema newSchema) {
Table table = catalog.loadTable(tableId);
// The transactionality of changes is guaranteed by comparing the old
schema with the current schema of the
@@ -469,14 +481,16 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
// for scenarios that cannot be changed, it is always considered that
there is a problem with the data.
Transaction transaction = table.newTransaction();
if (table.schema().sameSchema(oldSchema)) {
- List<TableChange> tableChanges =
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
+ Map<String, ColumnSchema> oldColumnSchemas =
extractColumnSchema(oldSchema);
+ Map<String, ColumnSchema> newColumnSchemas =
extractColumnSchema(newSchema);
+ List<TableChange> tableChanges =
IcebergSchemaChangeUtils.diffSchema(oldColumnSchemas, newColumnSchemas);
for (TableChange tableChange : tableChanges) {
if (tableChange instanceof TableChange.UnknownColumnChange) {
throw new UnsupportedOperationException(
String.format("Unsupported table %s schema change:
%s.", tableId.toString(), tableChange));
}
}
- SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(),
tableChanges);
+
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(),
tableChanges);
LOG.info("Schema evolution in table({}) for table change: {}",
tableId, tableChanges);
}
transaction.commitTransaction();
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
new file mode 100644
index 0000000000..31bfd732eb
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSchemaChangeUtils.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.inlong.sort.iceberg.FlinkTypeToType;
+import org.apache.inlong.sort.schema.TableChange;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class IcebergSchemaChangeUtils extends SchemaChangeUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergSchemaChangeUtils.class);
+
+ private static final Joiner DOT = Joiner.on(".");
+
+ public static void createTable(Catalog catalog, TableIdentifier tableId,
SupportsNamespaces asNamespaceCatalog,
+ Schema schema) {
+ if (!catalog.tableExists(tableId)) {
+ if (asNamespaceCatalog != null &&
!asNamespaceCatalog.namespaceExists(tableId.namespace())) {
+ try {
+ asNamespaceCatalog.createNamespace(tableId.namespace());
+ LOGGER.info("Auto create Database({}) in Catalog({}).",
tableId.namespace(), catalog.name());
+ } catch (AlreadyExistsException e) {
+ LOGGER.warn("Database({}) already exist in Catalog({})!",
tableId.namespace(), catalog.name());
+ }
+ }
+ ImmutableMap.Builder<String, String> properties =
ImmutableMap.builder();
+ properties.put("format-version", "2");
+ properties.put("write.upsert.enabled", "true");
+ properties.put("write.metadata.metrics.default", "full");
+ // for hive visible
+ properties.put("engine.hive.enabled", "true");
+ try {
+ catalog.createTable(tableId, schema,
PartitionSpec.unpartitioned(), properties.build());
+ LOGGER.info("Auto create Table({}) in Database({}) in
Catalog({})!",
+ tableId.name(), tableId.namespace(), catalog.name());
+ } catch (AlreadyExistsException e) {
+ LOGGER.warn("Table({}) already exist in Database({}) in
Catalog({})!",
+ tableId.name(), tableId.namespace(), catalog.name());
+ }
+ }
+ }
+
+ public static void applySchemaChanges(UpdateSchema pendingUpdate,
List<TableChange> tableChanges) {
+ for (TableChange change : tableChanges) {
+ if (change instanceof TableChange.AddColumn) {
+ applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
+ } else if (change instanceof TableChange.DeleteColumn) {
+ applyDeleteColumn(pendingUpdate, (TableChange.DeleteColumn)
change);
+ } else if (change instanceof TableChange.UpdateColumn) {
+ applyUpdateColumn(pendingUpdate, (TableChange.UpdateColumn)
change);
+ } else {
+ throw new UnsupportedOperationException("Cannot apply unknown
table change: " + change);
+ }
+ }
+ pendingUpdate.commit();
+ }
+
+ public static void applyAddColumn(UpdateSchema pendingUpdate,
TableChange.AddColumn add) {
+ Preconditions.checkArgument(add.isNullable(),
+ "Incompatible change: cannot add required column: %s",
leafName(add.fieldNames()));
+ Type type = add.dataType().accept(new
FlinkTypeToType(RowType.of(add.dataType())));
+ pendingUpdate.addColumn(parentName(add.fieldNames()),
leafName(add.fieldNames()), type, add.comment());
+
+ if (add.position() instanceof TableChange.After) {
+ TableChange.After after = (TableChange.After) add.position();
+ String referenceField = peerName(add.fieldNames(), after.column());
+ pendingUpdate.moveAfter(DOT.join(add.fieldNames()),
referenceField);
+
+ } else if (add.position() instanceof TableChange.First) {
+ pendingUpdate.moveFirst(DOT.join(add.fieldNames()));
+
+ } else {
+ Preconditions.checkArgument(add.position() == null,
+ "Cannot add '%s' at unknown position: %s",
DOT.join(add.fieldNames()), add.position());
+ }
+ }
+
+ public static void applyDeleteColumn(UpdateSchema pendingUpdate,
TableChange.DeleteColumn delete) {
+ pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
+ }
+
+ public static void applyUpdateColumn(UpdateSchema pendingUpdate,
TableChange.UpdateColumn update) {
+ Type type = update.dataType().accept(new
FlinkTypeToType(RowType.of(update.dataType())));
+ pendingUpdate.updateColumn(DOT.join(update.fieldNames()),
type.asPrimitiveType(), update.comment());
+ }
+
+ public static String leafName(String[] fieldNames) {
+ Preconditions.checkArgument(fieldNames.length > 0, "Invalid field
name: at least one name is required");
+ return fieldNames[fieldNames.length - 1];
+ }
+
+ public static String peerName(String[] fieldNames, String fieldName) {
+ if (fieldNames.length > 1) {
+ String[] peerNames = Arrays.copyOf(fieldNames, fieldNames.length);
+ peerNames[fieldNames.length - 1] = fieldName;
+ return DOT.join(peerNames);
+ }
+ return fieldName;
+ }
+
+ public static String parentName(String[] fieldNames) {
+ if (fieldNames.length > 1) {
+ return DOT.join(Arrays.copyOfRange(fieldNames, 0,
fieldNames.length - 1));
+ }
+ return null;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
deleted file mode 100644
index f5f1001a70..0000000000
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.iceberg.sink.multiple;
-
-import org.apache.inlong.sort.base.sink.TableChange;
-import org.apache.inlong.sort.base.sink.TableChange.ColumnPosition;
-import org.apache.inlong.sort.base.sink.TableChange.UnknownColumnChange;
-import org.apache.inlong.sort.iceberg.FlinkTypeToType;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.UpdateSchema;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.Types.NestedField;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class SchemaChangeUtils {
-
- private static final Joiner DOT = Joiner.on(".");
-
- /**
- * Compare two schemas and get the schema changes that happened in them.
- * TODO: currently only support add column,delete column and column type
change, rename column and column position change are not supported
- *
- * @param oldSchema
- * @param newSchema
- * @return
- */
- static List<TableChange> diffSchema(Schema oldSchema, Schema newSchema) {
- List<String> oldFields =
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
- List<String> newFields =
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
- Set<String> oldFieldSet = new HashSet<>(oldFields);
- Set<String> newFieldSet = new HashSet<>(newFields);
-
- Set<String> intersectColSet = Sets.intersection(oldFieldSet,
newFieldSet);
- Set<String> colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
- Set<String> colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
-
- List<TableChange> tableChanges = new ArrayList<>();
-
- // step0: judge whether unknown change
- // 1.just diff two different schema can not distinguish(add + delete)
vs modify
- // Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d],
currently it is only judged as unknown
- // change.
- // In next version,we will judge it is [delete and add] or rename by
using information extracted from DDL
- if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
- tableChanges.add(new UnknownColumnChange(
- String.format(" Old schema: [%s] and new schema: [%s], it
is unknown column change",
- oldSchema.toString(), newSchema.toString())));
- return tableChanges;
- }
-
- // 2.if some filed positions in new schema are not same with old
schema, there is no way to deal with it.
- // This situation only is regarded as unknown column change
- if (colsToDelete.isEmpty() && colsToAdd.isEmpty() &&
oldFieldSet.equals(newFieldSet)
- && !oldFields.equals(newFields)) {
- tableChanges.add(
- new UnknownColumnChange(
- String.format(
- " Old schema: [%s] and new schema: [%s],
they are same but some filed positions are not same."
- +
- " This situation only is regarded
as unknown column change at present",
- oldSchema.toString(),
newSchema.toString())));
- return tableChanges;
- }
-
- // step1: judge whether column type change
- for (String colName : intersectColSet) {
- NestedField oldField = oldSchema.findField(colName);
- NestedField newField = newSchema.findField(colName);
- if (!oldField.type().equals(newField.type()) ||
!oldField.doc().equals(newField.doc())) {
- tableChanges.add(
- new TableChange.UpdateColumn(
- new String[]{newField.name()},
- FlinkSchemaUtil.convert(newField.type()),
- !newField.isRequired(),
- newField.doc()));
- }
- }
-
- // step2: judge whether delete column
- for (String colName : oldFields) {
- if (colsToDelete.contains(colName)) {
- tableChanges.add(
- new TableChange.DeleteColumn(
- new String[]{colName}));
- }
- }
-
- // step3: judge whether add column
- if (!colsToAdd.isEmpty()) {
- for (int i = 0; i < newFields.size(); i++) {
- String colName = newFields.get(i);
- if (colsToAdd.contains(colName)) {
- NestedField addField = newSchema.findField(colName);
- tableChanges.add(
- new TableChange.AddColumn(
- new String[]{addField.name()},
- FlinkSchemaUtil.convert(addField.type()),
- !addField.isRequired(),
- addField.doc(),
- i == 0 ? ColumnPosition.first() :
ColumnPosition.after(newFields.get(i - 1))));
- }
- }
- }
- return tableChanges;
- }
-
- public static void applySchemaChanges(UpdateSchema pendingUpdate,
List<TableChange> tableChanges) {
- for (TableChange change : tableChanges) {
- if (change instanceof TableChange.AddColumn) {
- applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
- } else if (change instanceof TableChange.DeleteColumn) {
- applyDeleteColumn(pendingUpdate, (TableChange.DeleteColumn)
change);
- } else if (change instanceof TableChange.UpdateColumn) {
- applyUpdateColumn(pendingUpdate, (TableChange.UpdateColumn)
change);
- } else {
- throw new UnsupportedOperationException("Cannot apply unknown
table change: " + change);
- }
- }
- pendingUpdate.commit();
- }
-
- public static void applyAddColumn(UpdateSchema pendingUpdate,
TableChange.AddColumn add) {
- Preconditions.checkArgument(add.isNullable(),
- "Incompatible change: cannot add required column: %s",
leafName(add.fieldNames()));
- Type type = add.dataType().accept(new
FlinkTypeToType(RowType.of(add.dataType())));
- pendingUpdate.addColumn(parentName(add.fieldNames()),
leafName(add.fieldNames()), type, add.comment());
-
- if (add.position() instanceof TableChange.After) {
- TableChange.After after = (TableChange.After) add.position();
- String referenceField = peerName(add.fieldNames(), after.column());
- pendingUpdate.moveAfter(DOT.join(add.fieldNames()),
referenceField);
-
- } else if (add.position() instanceof TableChange.First) {
- pendingUpdate.moveFirst(DOT.join(add.fieldNames()));
-
- } else {
- Preconditions.checkArgument(add.position() == null,
- "Cannot add '%s' at unknown position: %s",
DOT.join(add.fieldNames()), add.position());
- }
- }
-
- public static void applyDeleteColumn(UpdateSchema pendingUpdate,
TableChange.DeleteColumn delete) {
- pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
- }
-
- public static void applyUpdateColumn(UpdateSchema pendingUpdate,
TableChange.UpdateColumn update) {
- Type type = update.dataType().accept(new
FlinkTypeToType(RowType.of(update.dataType())));
- pendingUpdate.updateColumn(DOT.join(update.fieldNames()),
type.asPrimitiveType(), update.comment());
- }
-
- public static String leafName(String[] fieldNames) {
- Preconditions.checkArgument(fieldNames.length > 0, "Invalid field
name: at least one name is required");
- return fieldNames[fieldNames.length - 1];
- }
-
- public static String peerName(String[] fieldNames, String fieldName) {
- if (fieldNames.length > 1) {
- String[] peerNames = Arrays.copyOf(fieldNames, fieldNames.length);
- peerNames[fieldNames.length - 1] = fieldName;
- return DOT.join(peerNames);
- }
- return fieldName;
- }
-
- public static String parentName(String[] fieldNames) {
- if (fieldNames.length > 1) {
- return DOT.join(Arrays.copyOfRange(fieldNames, 0,
fieldNames.length - 1));
- }
- return null;
- }
-}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestIcebergSchemaChangeUtils.java
similarity index 85%
rename from
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
rename to
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestIcebergSchemaChangeUtils.java
index 466214434c..9552ebeebd 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestIcebergSchemaChangeUtils.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sort.iceberg.sink.multiple;
-import org.apache.inlong.sort.base.sink.TableChange;
+import org.apache.inlong.sort.schema.TableChange;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
@@ -27,7 +27,9 @@ import org.junit.Test;
import java.util.List;
-public class TestSchemaChangeUtils {
+import static
org.apache.inlong.sort.iceberg.sink.multiple.DynamicSchemaHandleOperator.extractColumnSchema;
+
+public class TestIcebergSchemaChangeUtils {
private Schema baseSchema;
@@ -58,7 +60,7 @@ public class TestSchemaChangeUtils {
Assert.assertFalse(baseSchema.sameSchema(addColSchema));
- return SchemaChangeUtils.diffSchema(baseSchema, addColSchema);
+ return
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
extractColumnSchema(addColSchema));
}
@Test
@@ -78,7 +80,7 @@ public class TestSchemaChangeUtils {
Assert.assertFalse(baseSchema.sameSchema(delColSchema));
- return SchemaChangeUtils.diffSchema(baseSchema, delColSchema);
+ return
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
extractColumnSchema(delColSchema));
}
@Test
@@ -107,7 +109,8 @@ public class TestSchemaChangeUtils {
Assert.assertFalse(baseSchema.sameSchema(updateTypeColSchema));
- return SchemaChangeUtils.diffSchema(baseSchema, updateTypeColSchema);
+ return
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
+ extractColumnSchema(updateTypeColSchema));
}
public List<TableChange> testCommentTypeColumn() {
@@ -118,7 +121,8 @@ public class TestSchemaChangeUtils {
Assert.assertFalse(baseSchema.sameSchema(updateCommentColSchema));
- return SchemaChangeUtils.diffSchema(baseSchema,
updateCommentColSchema);
+ return
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
+ extractColumnSchema(updateCommentColSchema));
}
@Test
@@ -130,7 +134,8 @@ public class TestSchemaChangeUtils {
Assert.assertFalse(baseSchema.sameSchema(renameColumnSchema));
- List<TableChange> tableChanges =
SchemaChangeUtils.diffSchema(baseSchema, renameColumnSchema);
+ List<TableChange> tableChanges =
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
+ extractColumnSchema(renameColumnSchema));
Assert.assertEquals("rename column is not supported.", 1,
tableChanges.size());
for (TableChange tableChange : tableChanges) {
Assert.assertTrue("The table changes must be UnknownColumnChange ",
@@ -147,7 +152,8 @@ public class TestSchemaChangeUtils {
Assert.assertFalse(baseSchema.sameSchema(positionChangeSchema));
- List<TableChange> tableChanges =
SchemaChangeUtils.diffSchema(baseSchema, positionChangeSchema);
+ List<TableChange> tableChanges =
IcebergSchemaChangeUtils.diffSchema(extractColumnSchema(baseSchema),
+ extractColumnSchema(positionChangeSchema));
Assert.assertTrue(tableChanges.size() == 1);
for (TableChange tableChange : tableChanges) {
Assert.assertTrue("The table changes must be UnknownColumnChange ",