This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b3ba35912 [INLONG-7959][Sort] Dynamic schema evolution support delete
and update columns when sink to Iceberg (#8120)
b3ba35912 is described below
commit b3ba35912f209ba614d0db56c73cf6008aed68ee
Author: hejiay <[email protected]>
AuthorDate: Wed May 31 12:24:27 2023 +0800
[INLONG-7959][Sort] Dynamic schema evolution support delete and update
columns when sink to Iceberg (#8120)
Co-authored-by: Jiayuan HE 何家源 <[email protected]>
---
.../apache/inlong/sort/base/sink/TableChange.java | 73 +++++++++-
.../sink/multiple/DynamicSchemaHandleOperator.java | 15 +-
.../iceberg/sink/multiple/SchemaChangeUtils.java | 105 +++++++++++---
.../sink/multiple/TestSchemaChangeUtils.java | 157 +++++++++++++++++++++
4 files changed, 316 insertions(+), 34 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
index 304b44a1d..35219763b 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
@@ -178,9 +178,80 @@ public interface TableChange {
final class DeleteColumn implements ColumnChange {
+ private final String[] fieldNames;
+
+ public DeleteColumn(String[] fieldsNames) {
+ Preconditions.checkArgument(fieldsNames.length > 0, "Invalid filed
name: at least one is required");
+ this.fieldNames = fieldsNames;
+ }
+
@Override
public String[] fieldNames() {
- return new String[0];
+ return fieldNames;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DeleteColumn)) {
+ return false;
+ }
+ DeleteColumn that = (DeleteColumn) o;
+ return Arrays.equals(fieldNames, that.fieldNames);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(fieldNames);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DELETE COLUMNS `%s`",
fieldNames[fieldNames.length - 1]);
+ }
+ }
+
+ final class UpdateColumn implements ColumnChange {
+
+ private final String[] fieldNames;
+ private final LogicalType dataType;
+ private final boolean isNullable;
+ private final String comment;
+
+ public UpdateColumn(String[] fieldNames, LogicalType dataType, boolean
isNullable, String comment) {
+ Preconditions.checkArgument(fieldNames.length > 0, "Invalid filed
name: at least one is required");
+ this.fieldNames = fieldNames;
+ this.dataType = dataType;
+ this.isNullable = isNullable;
+ this.comment = comment;
+ }
+
+ @Override
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public LogicalType dataType() {
+ return dataType;
+ }
+
+ public boolean isNullable() {
+ return isNullable;
+ }
+
+ public String comment() {
+ return comment;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("UPDATE COLUMNS `%s` %s %s %s ",
+ fieldNames[fieldNames.length - 1],
+ dataType,
+ isNullable ? "" : "NOT NULL",
+ comment);
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index f5d492eda..a21e4307d 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -30,7 +30,6 @@ import
org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.sink.TableChange;
-import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.flink.api.common.state.ListState;
@@ -59,7 +58,6 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.types.Types.NestedField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -473,8 +471,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
if (table.schema().sameSchema(oldSchema)) {
List<TableChange> tableChanges =
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
for (TableChange tableChange : tableChanges) {
- if (!(tableChange instanceof AddColumn)) {
- // todo:currently iceberg can only handle addColumn, so
always return false
+ if (tableChange instanceof TableChange.UnknownColumnChange) {
throw new UnsupportedOperationException(
String.format("Unsupported table %s schema change:
%s.", tableId.toString(), tableChange));
}
@@ -487,14 +484,10 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
}
// =============================== Utils method
=================================================================
- // The way to judge compatibility is whether all the field names in the
old schema exist in the new schema
+ // if newSchema is not same with oldSchema, return false. It include
difference in name, type, position, and
+ // quantity
private boolean isCompatible(Schema newSchema, Schema oldSchema) {
- for (NestedField field : oldSchema.columns()) {
- if (newSchema.findField(field.name()) == null) {
- return false;
- }
- }
- return true;
+ return oldSchema.sameSchema(newSchema);
}
private TableIdentifier parseId(JsonNode data) throws IOException {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
index 1eb36b035..f5f1001a7 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
@@ -18,11 +18,11 @@
package org.apache.inlong.sort.iceberg.sink.multiple;
import org.apache.inlong.sort.base.sink.TableChange;
-import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
import org.apache.inlong.sort.base.sink.TableChange.ColumnPosition;
import org.apache.inlong.sort.base.sink.TableChange.UnknownColumnChange;
import org.apache.inlong.sort.iceberg.FlinkTypeToType;
+import com.google.common.collect.Sets;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateSchema;
@@ -34,7 +34,9 @@ import org.apache.iceberg.types.Types.NestedField;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
public class SchemaChangeUtils {
@@ -43,7 +45,7 @@ public class SchemaChangeUtils {
/**
* Compare two schemas and get the schema changes that happened in them.
- * TODO: currently only support add column
+ * TODO: currently only support add column,delete column and column type
change, rename column and column position change are not supported
*
* @param oldSchema
* @param newSchema
@@ -52,41 +54,91 @@ public class SchemaChangeUtils {
static List<TableChange> diffSchema(Schema oldSchema, Schema newSchema) {
List<String> oldFields =
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
List<String> newFields =
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
- int oi = 0;
- int ni = 0;
+ Set<String> oldFieldSet = new HashSet<>(oldFields);
+ Set<String> newFieldSet = new HashSet<>(newFields);
+
+ Set<String> intersectColSet = Sets.intersection(oldFieldSet,
newFieldSet);
+ Set<String> colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+ Set<String> colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
List<TableChange> tableChanges = new ArrayList<>();
- while (ni < newFields.size()) {
- if (oi < oldFields.size() &&
oldFields.get(oi).equals(newFields.get(ni))) {
- oi++;
- ni++;
- } else {
- NestedField newField = newSchema.findField(newFields.get(ni));
+
+ // step0: judge whether unknown change
+ // 1.just diff two different schema can not distinguish(add + delete)
vs modify
+ // Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d],
currently it is only judged as unknown
+ // change.
+ // In next version,we will judge it is [delete and add] or rename by
using information extracted from DDL
+ if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
+ tableChanges.add(new UnknownColumnChange(
+ String.format(" Old schema: [%s] and new schema: [%s], it
is unknown column change",
+ oldSchema.toString(), newSchema.toString())));
+ return tableChanges;
+ }
+
+ // 2.if some filed positions in new schema are not same with old
schema, there is no way to deal with it.
+ // This situation only is regarded as unknown column change
+ if (colsToDelete.isEmpty() && colsToAdd.isEmpty() &&
oldFieldSet.equals(newFieldSet)
+ && !oldFields.equals(newFields)) {
+ tableChanges.add(
+ new UnknownColumnChange(
+ String.format(
+ " Old schema: [%s] and new schema: [%s],
they are same but some filed positions are not same."
+ +
+ " This situation only is regarded
as unknown column change at present",
+ oldSchema.toString(),
newSchema.toString())));
+ return tableChanges;
+ }
+
+ // step1: judge whether column type change
+ for (String colName : intersectColSet) {
+ NestedField oldField = oldSchema.findField(colName);
+ NestedField newField = newSchema.findField(colName);
+ if (!oldField.type().equals(newField.type()) ||
!oldField.doc().equals(newField.doc())) {
tableChanges.add(
- new AddColumn(
+ new TableChange.UpdateColumn(
new String[]{newField.name()},
FlinkSchemaUtil.convert(newField.type()),
!newField.isRequired(),
- newField.doc(),
- ni == 0 ? ColumnPosition.first() :
ColumnPosition.after(newFields.get(ni - 1))));
- ni++;
+ newField.doc()));
}
}
- if (oi != oldFields.size()) {
- tableChanges.clear();
- tableChanges.add(
- new UnknownColumnChange(
- String.format("Unsupported schema update.\n"
- + "oldSchema:\n%s\n, newSchema:\n %s",
oldSchema, newSchema)));
+ // step2: judge whether delete column
+ for (String colName : oldFields) {
+ if (colsToDelete.contains(colName)) {
+ tableChanges.add(
+ new TableChange.DeleteColumn(
+ new String[]{colName}));
+ }
}
+ // step3: judge whether add column
+ if (!colsToAdd.isEmpty()) {
+ for (int i = 0; i < newFields.size(); i++) {
+ String colName = newFields.get(i);
+ if (colsToAdd.contains(colName)) {
+ NestedField addField = newSchema.findField(colName);
+ tableChanges.add(
+ new TableChange.AddColumn(
+ new String[]{addField.name()},
+ FlinkSchemaUtil.convert(addField.type()),
+ !addField.isRequired(),
+ addField.doc(),
+ i == 0 ? ColumnPosition.first() :
ColumnPosition.after(newFields.get(i - 1))));
+ }
+ }
+ }
return tableChanges;
}
public static void applySchemaChanges(UpdateSchema pendingUpdate,
List<TableChange> tableChanges) {
for (TableChange change : tableChanges) {
if (change instanceof TableChange.AddColumn) {
- apply(pendingUpdate, (TableChange.AddColumn) change);
+ applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
+ } else if (change instanceof TableChange.DeleteColumn) {
+ applyDeleteColumn(pendingUpdate, (TableChange.DeleteColumn)
change);
+ } else if (change instanceof TableChange.UpdateColumn) {
+ applyUpdateColumn(pendingUpdate, (TableChange.UpdateColumn)
change);
} else {
throw new UnsupportedOperationException("Cannot apply unknown
table change: " + change);
}
@@ -94,7 +146,7 @@ public class SchemaChangeUtils {
pendingUpdate.commit();
}
- public static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn
add) {
+ public static void applyAddColumn(UpdateSchema pendingUpdate,
TableChange.AddColumn add) {
Preconditions.checkArgument(add.isNullable(),
"Incompatible change: cannot add required column: %s",
leafName(add.fieldNames()));
Type type = add.dataType().accept(new
FlinkTypeToType(RowType.of(add.dataType())));
@@ -114,6 +166,15 @@ public class SchemaChangeUtils {
}
}
+ public static void applyDeleteColumn(UpdateSchema pendingUpdate,
TableChange.DeleteColumn delete) {
+ pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
+ }
+
+ public static void applyUpdateColumn(UpdateSchema pendingUpdate,
TableChange.UpdateColumn update) {
+ Type type = update.dataType().accept(new
FlinkTypeToType(RowType.of(update.dataType())));
+ pendingUpdate.updateColumn(DOT.join(update.fieldNames()),
type.asPrimitiveType(), update.comment());
+ }
+
public static String leafName(String[] fieldNames) {
Preconditions.checkArgument(fieldNames.length > 0, "Invalid field
name: at least one name is required");
return fieldNames[fieldNames.length - 1];
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
new file mode 100644
index 000000000..466214434
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.inlong.sort.base.sink.TableChange;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestSchemaChangeUtils {
+
+ private Schema baseSchema;
+
+ @Before
+ public void setup() {
+ baseSchema = new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get(),
"primary key"),
+ Types.NestedField.optional(2, "name", Types.StringType.get(),
"name"),
+ Types.NestedField.optional(3, "age", Types.IntegerType.get(),
"age"));
+ }
+
+ @Test
+ public void testAddColumn() {
+ List<TableChange> tableChanges = addColumn();
+ Assert.assertEquals("Table changes size must be 2 when add 2 column",
2, tableChanges.size());
+ for (TableChange tableChange : tableChanges) {
+ Assert.assertTrue("The table change must be AddColumn ",
tableChange instanceof TableChange.AddColumn);
+ }
+ }
+
+ public List<TableChange> addColumn() {
+ Schema addColSchema = new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get(),
"primary key"),
+ Types.NestedField.optional(2, "name", Types.StringType.get(),
"name"),
+ Types.NestedField.optional(3, "age", Types.IntegerType.get(),
"age"),
+ Types.NestedField.optional(4, "city", Types.StringType.get(),
"city"),
+ Types.NestedField.optional(5, "sex", Types.StringType.get(),
"sex"));
+
+ Assert.assertFalse(baseSchema.sameSchema(addColSchema));
+
+ return SchemaChangeUtils.diffSchema(baseSchema, addColSchema);
+ }
+
+ @Test
+ public void testDeleteColumn() {
+ List<TableChange> tableChanges = deleteColumn();
+ Assert.assertEquals("Table changes size must be 1 when del 1 column",
1, tableChanges.size());
+ for (TableChange tableChange : tableChanges) {
+ Assert.assertTrue("The table change must be DeleteColumn ",
+ tableChange instanceof TableChange.DeleteColumn);
+ }
+ }
+
+ public List<TableChange> deleteColumn() {
+ Schema delColSchema = new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get(),
"primary key"),
+ Types.NestedField.optional(2, "name", Types.StringType.get(),
"name"));
+
+ Assert.assertFalse(baseSchema.sameSchema(delColSchema));
+
+ return SchemaChangeUtils.diffSchema(baseSchema, delColSchema);
+ }
+
+ @Test
+ public void testUpdateColumn() {
+ List<TableChange> updateTypeTableChanges = testUpdateTypeColumn();
+ Assert.assertEquals("update 2 col, id: int -> double; age: int ->
long", 2, updateTypeTableChanges.size());
+ for (TableChange tableChange : updateTypeTableChanges) {
+ Assert.assertTrue("The table changes must be UpdateColumn ",
+ tableChange instanceof TableChange.UpdateColumn);
+ }
+
+ List<TableChange> updateCommentTableChanges = testCommentTypeColumn();
+ Assert.assertEquals("update 1 col comment, name comment: name ->
family name:", 1,
+ updateCommentTableChanges.size());
+ for (TableChange tableChange : updateCommentTableChanges) {
+ Assert.assertTrue("The table changes must be UpdateColumn ",
+ tableChange instanceof TableChange.UpdateColumn);
+ }
+ }
+
+ public List<TableChange> testUpdateTypeColumn() {
+ Schema updateTypeColSchema = new Schema(
+ Types.NestedField.required(1, "id", Types.DoubleType.get(),
"primary key"),
+ Types.NestedField.optional(2, "name", Types.StringType.get(),
"name"),
+ Types.NestedField.optional(3, "age", Types.LongType.get(),
"age"));
+
+ Assert.assertFalse(baseSchema.sameSchema(updateTypeColSchema));
+
+ return SchemaChangeUtils.diffSchema(baseSchema, updateTypeColSchema);
+ }
+
+ public List<TableChange> testCommentTypeColumn() {
+ Schema updateCommentColSchema = new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get(),
"primary key"),
+ Types.NestedField.optional(2, "name", Types.StringType.get(),
"family_name"),
+ Types.NestedField.optional(3, "age", Types.IntegerType.get(),
"age"));
+
+ Assert.assertFalse(baseSchema.sameSchema(updateCommentColSchema));
+
+ return SchemaChangeUtils.diffSchema(baseSchema,
updateCommentColSchema);
+ }
+
+ @Test
+ public void testRenameColumn() {
+ Schema renameColumnSchema = new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get(),
"primary key"),
+ Types.NestedField.optional(2, "family_name",
Types.StringType.get(), "name"),
+ Types.NestedField.optional(3, "age", Types.IntegerType.get(),
"age"));
+
+ Assert.assertFalse(baseSchema.sameSchema(renameColumnSchema));
+
+ List<TableChange> tableChanges =
SchemaChangeUtils.diffSchema(baseSchema, renameColumnSchema);
+ Assert.assertEquals("rename column is not supported.", 1,
tableChanges.size());
+ for (TableChange tableChange : tableChanges) {
+ Assert.assertTrue("The table changes must be UnknownColumnChange ",
+ tableChange instanceof TableChange.UnknownColumnChange);
+ }
+ }
+
+ @Test
+ public void testColumnPositionChange() {
+ Schema positionChangeSchema = new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get(),
"primary key"),
+ Types.NestedField.optional(2, "age", Types.StringType.get(),
"age"),
+ Types.NestedField.optional(3, "name", Types.IntegerType.get(),
"name"));
+
+ Assert.assertFalse(baseSchema.sameSchema(positionChangeSchema));
+
+ List<TableChange> tableChanges =
SchemaChangeUtils.diffSchema(baseSchema, positionChangeSchema);
+ Assert.assertTrue(tableChanges.size() == 1);
+ for (TableChange tableChange : tableChanges) {
+ Assert.assertTrue("The table changes must be UnknownColumnChange ",
+ tableChange instanceof TableChange.UnknownColumnChange);
+ }
+ }
+}