This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f93b7b2db [cdc] Add the latest_schema state at schema evolution 
operator to reduce the latest schema access frequency (#4535)
2f93b7b2db is described below

commit 2f93b7b2dbc3a659f3b25bcff6c23aee4218ebfe
Author: Gang Yang <[email protected]>
AuthorDate: Thu Nov 28 20:02:30 2024 +0800

    [cdc] Add the latest_schema state at schema evolution operator to reduce 
the latest schema access frequency (#4535)
---
 .../org/apache/paimon/types/FieldIdentifier.java   |  53 +++++
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |  41 +++-
 .../flink/action/cdc/SchemaEvolutionTest.java      | 219 +++++++++++++++++++++
 3 files changed, 312 insertions(+), 1 deletion(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java 
b/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
new file mode 100644
index 0000000000..7e9ced7cf9
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.types;
+
+import java.util.Objects;
+
+/** Used to indicate the uniqueness of a field. */
+public class FieldIdentifier {
+    private String name;
+    private DataType type;
+    private String description;
+
+    public FieldIdentifier(DataField dataField) {
+        this.name = dataField.name();
+        this.type = dataField.type();
+        this.description = dataField.description();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FieldIdentifier field = (FieldIdentifier) o;
+        return Objects.equals(name, field.name)
+                && Objects.equals(type, field.type)
+                && Objects.equals(description, field.description);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, description);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 4a33eb1b7e..64f00d96b0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -23,11 +23,18 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.FieldIdentifier;
+import org.apache.paimon.types.RowType;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a list of {@link
@@ -43,19 +50,51 @@ public class UpdatedDataFieldsProcessFunction
 
     private final Identifier identifier;
 
+    private Set<FieldIdentifier> latestFields;
+
     public UpdatedDataFieldsProcessFunction(
             SchemaManager schemaManager, Identifier identifier, Catalog.Loader 
catalogLoader) {
         super(catalogLoader);
         this.schemaManager = schemaManager;
         this.identifier = identifier;
+        this.latestFields = new HashSet<>();
     }
 
     @Override
     public void processElement(
             List<DataField> updatedDataFields, Context context, 
Collector<Void> collector)
             throws Exception {
-        for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, 
updatedDataFields)) {
+        List<DataField> actualUpdatedDataFields =
+                updatedDataFields.stream()
+                        .filter(
+                                dataField ->
+                                        !latestDataFieldContain(new 
FieldIdentifier(dataField)))
+                        .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(actualUpdatedDataFields)) {
+            return;
+        }
+        for (SchemaChange schemaChange :
+                extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
             applySchemaChange(schemaManager, schemaChange, identifier);
         }
+        /**
+         * Here, actualUpdatedDataFields cannot be used to update latestFields 
because there is a
+         * non-SchemaChange.AddColumn scenario. Otherwise, the previously 
existing fields cannot be
+         * modified again.
+         */
+        updateLatestFields();
+    }
+
+    private boolean latestDataFieldContain(FieldIdentifier dataField) {
+        return latestFields.stream().anyMatch(previous -> 
Objects.equals(previous, dataField));
+    }
+
+    private void updateLatestFields() {
+        RowType oldRowType = schemaManager.latest().get().logicalRowType();
+        Set<FieldIdentifier> fieldIdentifiers =
+                oldRowType.getFields().stream()
+                        .map(item -> new FieldIdentifier(item))
+                        .collect(Collectors.toSet());
+        latestFields = fieldIdentifiers;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
new file mode 100644
index 0000000000..9ba1837686
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Used to test schema evolution related logic. */
+public class SchemaEvolutionTest extends TableTestBase {
+
+    private static List<List<DataField>> prepareData() {
+        List<DataField> upField1 =
+                Arrays.asList(
+                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
+                        new DataField(1, "col_1", new IntType(), "test 
description."),
+                        new DataField(2, "col_2", new IntType(), "test 
description."),
+                        new DataField(3, "col_3", new VarCharType(), 
"Someone's desc."),
+                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
+                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
+                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
+                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
+                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
+                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
+                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
+                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
+                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
+                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
+                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
+                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
+                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
+                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
+                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
+                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
+                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
+        List<DataField> upField2 =
+                Arrays.asList(
+                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
+                        new DataField(1, "col_1", new BigIntType(), "test 
description."),
+                        new DataField(2, "col_2", new IntType(), "test 
description."),
+                        new DataField(3, "col_3", new VarCharType(), 
"Someone's desc."),
+                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
+                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
+                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
+                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
+                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
+                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
+                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
+                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
+                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
+                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
+                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
+                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
+                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
+                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
+                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
+                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
+                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
+        List<DataField> upField3 =
+                Arrays.asList(
+                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
+                        new DataField(1, "col_1", new BigIntType(), "test 
description."),
+                        new DataField(2, "col_2", new IntType(), "test 
description 2."),
+                        new DataField(3, "col_3", new VarCharType(), 
"Someone's desc."),
+                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
+                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
+                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
+                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
+                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
+                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
+                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
+                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
+                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
+                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
+                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
+                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
+                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
+                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
+                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
+                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
+                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
+        List<DataField> upField4 =
+                Arrays.asList(
+                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
+                        new DataField(1, "col_1", new BigIntType(), "test 
description."),
+                        new DataField(2, "col_2", new IntType(), "test 
description."),
+                        new DataField(3, "col_3_1", new VarCharType(), 
"Someone's desc."),
+                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
+                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
+                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
+                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
+                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
+                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
+                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
+                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
+                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
+                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
+                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
+                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
+                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
+                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
+                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
+                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
+                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
+        List<DataField> upField5 =
+                Arrays.asList(
+                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
+                        new DataField(1, "col_1", new BigIntType(), "test 
description."),
+                        new DataField(2, "col_2_1", new BigIntType(), "test 
description 2."),
+                        new DataField(3, "col_3", new VarCharType(), 
"Someone's desc."),
+                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
+                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
+                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
+                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
+                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
+                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
+                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
+                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
+                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
+                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
+                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
+                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
+                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
+                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
+                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
+                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
+                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
+        return Arrays.asList(upField1, upField2, upField3, upField4, upField5);
+    }
+
+    private FileStoreTable table;
+    private String tableName = "MyTable";
+
+    @BeforeEach
+    public void before() throws Exception {
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt1", DataTypes.INT())
+                        .column("pt2", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt1", "pt2")
+                        .primaryKey("pk", "pt1", "pt2")
+                        .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+                        .option(CoreOptions.BUCKET.key(), "2")
+                        .option(CoreOptions.SEQUENCE_FIELD.key(), "col1")
+                        .build();
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
+        table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
+    }
+
+    @Test
+    public void testSchemaEvolution() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<List<DataField>> upDataFieldStream = 
env.fromCollection(prepareData());
+        Options options = new Options();
+        options.set("warehouse", tempPath.toString());
+        final Catalog.Loader catalogLoader = () -> 
FlinkCatalogFactory.createPaimonCatalog(options);
+        Identifier identifier = Identifier.create(database, tableName);
+        DataStream<Void> schemaChangeProcessFunction =
+                upDataFieldStream
+                        .process(
+                                new UpdatedDataFieldsProcessFunction(
+                                        new SchemaManager(table.fileIO(), 
table.location()),
+                                        identifier,
+                                        catalogLoader))
+                        .name("Schema Evolution");
+        schemaChangeProcessFunction.getTransformation().setParallelism(1);
+        schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+        env.execute();
+    }
+}

Reply via email to