This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b3ba35912 [INLONG-7959][Sort] Dynamic schema evolution support delete 
and update columns when sink to Iceberg (#8120)
b3ba35912 is described below

commit b3ba35912f209ba614d0db56c73cf6008aed68ee
Author: hejiay <[email protected]>
AuthorDate: Wed May 31 12:24:27 2023 +0800

    [INLONG-7959][Sort] Dynamic schema evolution support delete and update 
columns when sink to Iceberg (#8120)
    
    Co-authored-by: Jiayuan HE 何家源 <[email protected]>
---
 .../apache/inlong/sort/base/sink/TableChange.java  |  73 +++++++++-
 .../sink/multiple/DynamicSchemaHandleOperator.java |  15 +-
 .../iceberg/sink/multiple/SchemaChangeUtils.java   | 105 +++++++++++---
 .../sink/multiple/TestSchemaChangeUtils.java       | 157 +++++++++++++++++++++
 4 files changed, 316 insertions(+), 34 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
index 304b44a1d..35219763b 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java
@@ -178,9 +178,80 @@ public interface TableChange {
 
     final class DeleteColumn implements ColumnChange {
 
+        private final String[] fieldNames;
+
+        public DeleteColumn(String[] fieldsNames) {
+            Preconditions.checkArgument(fieldsNames.length > 0, "Invalid filed 
name: at least one is required");
+            this.fieldNames = fieldsNames;
+        }
+
         @Override
         public String[] fieldNames() {
-            return new String[0];
+            return fieldNames;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof DeleteColumn)) {
+                return false;
+            }
+            DeleteColumn that = (DeleteColumn) o;
+            return Arrays.equals(fieldNames, that.fieldNames);
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(fieldNames);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("DELETE COLUMNS `%s`", 
fieldNames[fieldNames.length - 1]);
+        }
+    }
+
+    final class UpdateColumn implements ColumnChange {
+
+        private final String[] fieldNames;
+        private final LogicalType dataType;
+        private final boolean isNullable;
+        private final String comment;
+
+        public UpdateColumn(String[] fieldNames, LogicalType dataType, boolean 
isNullable, String comment) {
+            Preconditions.checkArgument(fieldNames.length > 0, "Invalid filed 
name: at least one is required");
+            this.fieldNames = fieldNames;
+            this.dataType = dataType;
+            this.isNullable = isNullable;
+            this.comment = comment;
+        }
+
+        @Override
+        public String[] fieldNames() {
+            return fieldNames;
+        }
+
+        public LogicalType dataType() {
+            return dataType;
+        }
+
+        public boolean isNullable() {
+            return isNullable;
+        }
+
+        public String comment() {
+            return comment;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("UPDATE COLUMNS `%s` %s %s %s ",
+                    fieldNames[fieldNames.length - 1],
+                    dataType,
+                    isNullable ? "" : "NOT NULL",
+                    comment);
         }
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index f5d492eda..a21e4307d 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -30,7 +30,6 @@ import 
org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.sink.TableChange;
-import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import org.apache.flink.api.common.state.ListState;
@@ -59,7 +58,6 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.types.Types.NestedField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -473,8 +471,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         if (table.schema().sameSchema(oldSchema)) {
             List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
             for (TableChange tableChange : tableChanges) {
-                if (!(tableChange instanceof AddColumn)) {
-                    // todo:currently iceberg can only handle addColumn, so 
always return false
+                if (tableChange instanceof TableChange.UnknownColumnChange) {
                     throw new UnsupportedOperationException(
                             String.format("Unsupported table %s schema change: 
%s.", tableId.toString(), tableChange));
                 }
@@ -487,14 +484,10 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     }
 
     // =============================== Utils method 
=================================================================
-    // The way to judge compatibility is whether all the field names in the 
old schema exist in the new schema
+    // if newSchema is not same with oldSchema, return false. It include 
difference in name, type, position, and
+    // quantity
     private boolean isCompatible(Schema newSchema, Schema oldSchema) {
-        for (NestedField field : oldSchema.columns()) {
-            if (newSchema.findField(field.name()) == null) {
-                return false;
-            }
-        }
-        return true;
+        return oldSchema.sameSchema(newSchema);
     }
 
     private TableIdentifier parseId(JsonNode data) throws IOException {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
index 1eb36b035..f5f1001a7 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
@@ -18,11 +18,11 @@
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
 import org.apache.inlong.sort.base.sink.TableChange;
-import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
 import org.apache.inlong.sort.base.sink.TableChange.ColumnPosition;
 import org.apache.inlong.sort.base.sink.TableChange.UnknownColumnChange;
 import org.apache.inlong.sort.iceberg.FlinkTypeToType;
 
+import com.google.common.collect.Sets;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.UpdateSchema;
@@ -34,7 +34,9 @@ import org.apache.iceberg.types.Types.NestedField;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class SchemaChangeUtils {
@@ -43,7 +45,7 @@ public class SchemaChangeUtils {
 
     /**
      * Compare two schemas and get the schema changes that happened in them.
-     * TODO: currently only support add column
+     * TODO: currently only support add column,delete column and column type 
change, rename column and column position change are not supported
      *
      * @param oldSchema
      * @param newSchema
@@ -52,41 +54,91 @@ public class SchemaChangeUtils {
     static List<TableChange> diffSchema(Schema oldSchema, Schema newSchema) {
         List<String> oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
         List<String> newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-        int oi = 0;
-        int ni = 0;
+        Set<String> oldFieldSet = new HashSet<>(oldFields);
+        Set<String> newFieldSet = new HashSet<>(newFields);
+
+        Set<String> intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+        Set<String> colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+        Set<String> colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
         List<TableChange> tableChanges = new ArrayList<>();
-        while (ni < newFields.size()) {
-            if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-                oi++;
-                ni++;
-            } else {
-                NestedField newField = newSchema.findField(newFields.get(ni));
+
+        // step0: judge whether unknown change
+        // 1.just diff two different schema can not distinguish(add + delete) 
vs modify
+        // Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
+        // change.
+        // In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
+        if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
+            tableChanges.add(new UnknownColumnChange(
+                    String.format(" Old schema: [%s] and new schema: [%s], it 
is unknown column change",
+                            oldSchema.toString(), newSchema.toString())));
+            return tableChanges;
+        }
+
+        // 2.if some filed positions in new schema are not same with old 
schema, there is no way to deal with it.
+        // This situation only is regarded as unknown column change
+        if (colsToDelete.isEmpty() && colsToAdd.isEmpty() && 
oldFieldSet.equals(newFieldSet)
+                && !oldFields.equals(newFields)) {
+            tableChanges.add(
+                    new UnknownColumnChange(
+                            String.format(
+                                    " Old schema: [%s] and new schema: [%s], 
they are same but some filed positions are not same."
+                                            +
+                                            " This situation only is regarded 
as unknown column change at present",
+                                    oldSchema.toString(), 
newSchema.toString())));
+            return tableChanges;
+        }
+
+        // step1: judge whether column type change
+        for (String colName : intersectColSet) {
+            NestedField oldField = oldSchema.findField(colName);
+            NestedField newField = newSchema.findField(colName);
+            if (!oldField.type().equals(newField.type()) || 
!oldField.doc().equals(newField.doc())) {
                 tableChanges.add(
-                        new AddColumn(
+                        new TableChange.UpdateColumn(
                                 new String[]{newField.name()},
                                 FlinkSchemaUtil.convert(newField.type()),
                                 !newField.isRequired(),
-                                newField.doc(),
-                                ni == 0 ? ColumnPosition.first() : 
ColumnPosition.after(newFields.get(ni - 1))));
-                ni++;
+                                newField.doc()));
             }
         }
 
-        if (oi != oldFields.size()) {
-            tableChanges.clear();
-            tableChanges.add(
-                    new UnknownColumnChange(
-                            String.format("Unsupported schema update.\n"
-                                    + "oldSchema:\n%s\n, newSchema:\n %s", 
oldSchema, newSchema)));
+        // step2: judge whether delete column
+        for (String colName : oldFields) {
+            if (colsToDelete.contains(colName)) {
+                tableChanges.add(
+                        new TableChange.DeleteColumn(
+                                new String[]{colName}));
+            }
         }
 
+        // step3: judge whether add column
+        if (!colsToAdd.isEmpty()) {
+            for (int i = 0; i < newFields.size(); i++) {
+                String colName = newFields.get(i);
+                if (colsToAdd.contains(colName)) {
+                    NestedField addField = newSchema.findField(colName);
+                    tableChanges.add(
+                            new TableChange.AddColumn(
+                                    new String[]{addField.name()},
+                                    FlinkSchemaUtil.convert(addField.type()),
+                                    !addField.isRequired(),
+                                    addField.doc(),
+                                    i == 0 ? ColumnPosition.first() : 
ColumnPosition.after(newFields.get(i - 1))));
+                }
+            }
+        }
         return tableChanges;
     }
 
     public static void applySchemaChanges(UpdateSchema pendingUpdate, 
List<TableChange> tableChanges) {
         for (TableChange change : tableChanges) {
             if (change instanceof TableChange.AddColumn) {
-                apply(pendingUpdate, (TableChange.AddColumn) change);
+                applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
+            } else if (change instanceof TableChange.DeleteColumn) {
+                applyDeleteColumn(pendingUpdate, (TableChange.DeleteColumn) 
change);
+            } else if (change instanceof TableChange.UpdateColumn) {
+                applyUpdateColumn(pendingUpdate, (TableChange.UpdateColumn) 
change);
             } else {
                 throw new UnsupportedOperationException("Cannot apply unknown 
table change: " + change);
             }
@@ -94,7 +146,7 @@ public class SchemaChangeUtils {
         pendingUpdate.commit();
     }
 
-    public static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn 
add) {
+    public static void applyAddColumn(UpdateSchema pendingUpdate, 
TableChange.AddColumn add) {
         Preconditions.checkArgument(add.isNullable(),
                 "Incompatible change: cannot add required column: %s", 
leafName(add.fieldNames()));
         Type type = add.dataType().accept(new 
FlinkTypeToType(RowType.of(add.dataType())));
@@ -114,6 +166,15 @@ public class SchemaChangeUtils {
         }
     }
 
+    public static void applyDeleteColumn(UpdateSchema pendingUpdate, 
TableChange.DeleteColumn delete) {
+        pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
+    }
+
+    public static void applyUpdateColumn(UpdateSchema pendingUpdate, 
TableChange.UpdateColumn update) {
+        Type type = update.dataType().accept(new 
FlinkTypeToType(RowType.of(update.dataType())));
+        pendingUpdate.updateColumn(DOT.join(update.fieldNames()), 
type.asPrimitiveType(), update.comment());
+    }
+
     public static String leafName(String[] fieldNames) {
         Preconditions.checkArgument(fieldNames.length > 0, "Invalid field 
name: at least one name is required");
         return fieldNames[fieldNames.length - 1];
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
new file mode 100644
index 000000000..466214434
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/test/java/org/apache/inlong/sort/iceberg/sink/multiple/TestSchemaChangeUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.inlong.sort.base.sink.TableChange;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestSchemaChangeUtils {
+
+    private Schema baseSchema;
+
+    @Before
+    public void setup() {
+        baseSchema = new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get(), 
"primary key"),
+                Types.NestedField.optional(2, "name", Types.StringType.get(), 
"name"),
+                Types.NestedField.optional(3, "age", Types.IntegerType.get(), 
"age"));
+    }
+
+    @Test
+    public void testAddColumn() {
+        List<TableChange> tableChanges = addColumn();
+        Assert.assertEquals("Table changes size must be 2 when add  2 column", 
2, tableChanges.size());
+        for (TableChange tableChange : tableChanges) {
+            Assert.assertTrue("The table change must be AddColumn ", 
tableChange instanceof TableChange.AddColumn);
+        }
+    }
+
+    public List<TableChange> addColumn() {
+        Schema addColSchema = new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get(), 
"primary key"),
+                Types.NestedField.optional(2, "name", Types.StringType.get(), 
"name"),
+                Types.NestedField.optional(3, "age", Types.IntegerType.get(), 
"age"),
+                Types.NestedField.optional(4, "city", Types.StringType.get(), 
"city"),
+                Types.NestedField.optional(5, "sex", Types.StringType.get(), 
"sex"));
+
+        Assert.assertFalse(baseSchema.sameSchema(addColSchema));
+
+        return SchemaChangeUtils.diffSchema(baseSchema, addColSchema);
+    }
+
+    @Test
+    public void testDeleteColumn() {
+        List<TableChange> tableChanges = deleteColumn();
+        Assert.assertEquals("Table changes size must be 1 when del 1 column", 
1, tableChanges.size());
+        for (TableChange tableChange : tableChanges) {
+            Assert.assertTrue("The table change must be DeleteColumn ",
+                    tableChange instanceof TableChange.DeleteColumn);
+        }
+    }
+
+    public List<TableChange> deleteColumn() {
+        Schema delColSchema = new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get(), 
"primary key"),
+                Types.NestedField.optional(2, "name", Types.StringType.get(), 
"name"));
+
+        Assert.assertFalse(baseSchema.sameSchema(delColSchema));
+
+        return SchemaChangeUtils.diffSchema(baseSchema, delColSchema);
+    }
+
+    @Test
+    public void testUpdateColumn() {
+        List<TableChange> updateTypeTableChanges = testUpdateTypeColumn();
+        Assert.assertEquals("update 2 col, id: int -> double; age: int -> 
long", 2, updateTypeTableChanges.size());
+        for (TableChange tableChange : updateTypeTableChanges) {
+            Assert.assertTrue("The table changes must be UpdateColumn ",
+                    tableChange instanceof TableChange.UpdateColumn);
+        }
+
+        List<TableChange> updateCommentTableChanges = testCommentTypeColumn();
+        Assert.assertEquals("update 1 col comment, name comment: name -> 
family name:", 1,
+                updateCommentTableChanges.size());
+        for (TableChange tableChange : updateCommentTableChanges) {
+            Assert.assertTrue("The table changes must be UpdateColumn ",
+                    tableChange instanceof TableChange.UpdateColumn);
+        }
+    }
+
+    public List<TableChange> testUpdateTypeColumn() {
+        Schema updateTypeColSchema = new Schema(
+                Types.NestedField.required(1, "id", Types.DoubleType.get(), 
"primary key"),
+                Types.NestedField.optional(2, "name", Types.StringType.get(), 
"name"),
+                Types.NestedField.optional(3, "age", Types.LongType.get(), 
"age"));
+
+        Assert.assertFalse(baseSchema.sameSchema(updateTypeColSchema));
+
+        return SchemaChangeUtils.diffSchema(baseSchema, updateTypeColSchema);
+    }
+
+    public List<TableChange> testCommentTypeColumn() {
+        Schema updateCommentColSchema = new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get(), 
"primary key"),
+                Types.NestedField.optional(2, "name", Types.StringType.get(), 
"family_name"),
+                Types.NestedField.optional(3, "age", Types.IntegerType.get(), 
"age"));
+
+        Assert.assertFalse(baseSchema.sameSchema(updateCommentColSchema));
+
+        return SchemaChangeUtils.diffSchema(baseSchema, 
updateCommentColSchema);
+    }
+
+    @Test
+    public void testRenameColumn() {
+        Schema renameColumnSchema = new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get(), 
"primary key"),
+                Types.NestedField.optional(2, "family_name", 
Types.StringType.get(), "name"),
+                Types.NestedField.optional(3, "age", Types.IntegerType.get(), 
"age"));
+
+        Assert.assertFalse(baseSchema.sameSchema(renameColumnSchema));
+
+        List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(baseSchema, renameColumnSchema);
+        Assert.assertEquals("rename column is not supported.", 1, 
tableChanges.size());
+        for (TableChange tableChange : tableChanges) {
+            Assert.assertTrue("The table changes must be UnknownColumnChange ",
+                    tableChange instanceof TableChange.UnknownColumnChange);
+        }
+    }
+
+    @Test
+    public void testColumnPositionChange() {
+        Schema positionChangeSchema = new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get(), 
"primary key"),
+                Types.NestedField.optional(2, "age", Types.StringType.get(), 
"age"),
+                Types.NestedField.optional(3, "name", Types.IntegerType.get(), 
"name"));
+
+        Assert.assertFalse(baseSchema.sameSchema(positionChangeSchema));
+
+        List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(baseSchema, positionChangeSchema);
+        Assert.assertTrue(tableChanges.size() == 1);
+        for (TableChange tableChange : tableChanges) {
+            Assert.assertTrue("The table changes must be UnknownColumnChange ",
+                    tableChange instanceof TableChange.UnknownColumnChange);
+        }
+    }
+}

Reply via email to