This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2f93b7b2db [cdc] Add the latest_schema state at schema evolution
operator to reduce the latest schema access frequency (#4535)
2f93b7b2db is described below
commit 2f93b7b2dbc3a659f3b25bcff6c23aee4218ebfe
Author: Gang Yang <[email protected]>
AuthorDate: Thu Nov 28 20:02:30 2024 +0800
[cdc] Add the latest_schema state at schema evolution operator to reduce
the latest schema access frequency (#4535)
---
.../org/apache/paimon/types/FieldIdentifier.java | 53 +++++
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 41 +++-
.../flink/action/cdc/SchemaEvolutionTest.java | 219 +++++++++++++++++++++
3 files changed, 312 insertions(+), 1 deletion(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
b/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
new file mode 100644
index 0000000000..7e9ced7cf9
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.types;
+
+import java.util.Objects;
+
+/** Used to indicate the uniqueness of a field. */
+public class FieldIdentifier {
+ private String name;
+ private DataType type;
+ private String description;
+
+ public FieldIdentifier(DataField dataField) {
+ this.name = dataField.name();
+ this.type = dataField.type();
+ this.description = dataField.description();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FieldIdentifier field = (FieldIdentifier) o;
+ return Objects.equals(name, field.name)
+ && Objects.equals(type, field.type)
+ && Objects.equals(description, field.description);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, description);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 4a33eb1b7e..64f00d96b0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -23,11 +23,18 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.FieldIdentifier;
+import org.apache.paimon.types.RowType;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
+import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* A {@link ProcessFunction} to handle schema changes. New schema is
represented by a list of {@link
@@ -43,19 +50,51 @@ public class UpdatedDataFieldsProcessFunction
private final Identifier identifier;
+ private Set<FieldIdentifier> latestFields;
+
public UpdatedDataFieldsProcessFunction(
SchemaManager schemaManager, Identifier identifier, Catalog.Loader
catalogLoader) {
super(catalogLoader);
this.schemaManager = schemaManager;
this.identifier = identifier;
+ this.latestFields = new HashSet<>();
}
@Override
public void processElement(
List<DataField> updatedDataFields, Context context,
Collector<Void> collector)
throws Exception {
- for (SchemaChange schemaChange : extractSchemaChanges(schemaManager,
updatedDataFields)) {
+ List<DataField> actualUpdatedDataFields =
+ updatedDataFields.stream()
+ .filter(
+ dataField ->
+ !latestDataFieldContain(new
FieldIdentifier(dataField)))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(actualUpdatedDataFields)) {
+ return;
+ }
+ for (SchemaChange schemaChange :
+ extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
applySchemaChange(schemaManager, schemaChange, identifier);
}
+ /**
+ * Here, actualUpdatedDataFields cannot be used to update latestFields
because there is a
+ * non-SchemaChange.AddColumn scenario. Otherwise, the previously
existing fields cannot be
+ * modified again.
+ */
+ updateLatestFields();
+ }
+
+ private boolean latestDataFieldContain(FieldIdentifier dataField) {
+ return latestFields.stream().anyMatch(previous ->
Objects.equals(previous, dataField));
+ }
+
+ private void updateLatestFields() {
+ RowType oldRowType = schemaManager.latest().get().logicalRowType();
+ Set<FieldIdentifier> fieldIdentifiers =
+ oldRowType.getFields().stream()
+ .map(item -> new FieldIdentifier(item))
+ .collect(Collectors.toSet());
+ latestFields = fieldIdentifiers;
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
new file mode 100644
index 0000000000..9ba1837686
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Used to test schema evolution related logic. */
+public class SchemaEvolutionTest extends TableTestBase {
+
+ private static List<List<DataField>> prepareData() {
+ List<DataField> upField1 =
+ Arrays.asList(
+ new DataField(0, "col_0", new VarCharType(), "test
description."),
+ new DataField(1, "col_1", new IntType(), "test
description."),
+ new DataField(2, "col_2", new IntType(), "test
description."),
+ new DataField(3, "col_3", new VarCharType(),
"Someone's desc."),
+ new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
+ new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
+ new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
+ new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
+ new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
+ new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
+ new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
+ new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
+ new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
+ new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
+ new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
+ new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
+ new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
+ new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
+ new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
+ new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
+ new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
+ List<DataField> upField2 =
+ Arrays.asList(
+ new DataField(0, "col_0", new VarCharType(), "test
description."),
+ new DataField(1, "col_1", new BigIntType(), "test
description."),
+ new DataField(2, "col_2", new IntType(), "test
description."),
+ new DataField(3, "col_3", new VarCharType(),
"Someone's desc."),
+ new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
+ new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
+ new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
+ new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
+ new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
+ new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
+ new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
+ new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
+ new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
+ new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
+ new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
+ new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
+ new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
+ new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
+ new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
+ new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
+ new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
+ List<DataField> upField3 =
+ Arrays.asList(
+ new DataField(0, "col_0", new VarCharType(), "test
description."),
+ new DataField(1, "col_1", new BigIntType(), "test
description."),
+ new DataField(2, "col_2", new IntType(), "test
description 2."),
+ new DataField(3, "col_3", new VarCharType(),
"Someone's desc."),
+ new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
+ new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
+ new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
+ new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
+ new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
+ new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
+ new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
+ new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
+ new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
+ new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
+ new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
+ new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
+ new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
+ new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
+ new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
+ new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
+ new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
+ List<DataField> upField4 =
+ Arrays.asList(
+ new DataField(0, "col_0", new VarCharType(), "test
description."),
+ new DataField(1, "col_1", new BigIntType(), "test
description."),
+ new DataField(2, "col_2", new IntType(), "test
description."),
+ new DataField(3, "col_3_1", new VarCharType(),
"Someone's desc."),
+ new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
+ new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
+ new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
+ new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
+ new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
+ new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
+ new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
+ new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
+ new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
+ new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
+ new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
+ new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
+ new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
+ new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
+ new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
+ new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
+ new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
+ List<DataField> upField5 =
+ Arrays.asList(
+ new DataField(0, "col_0", new VarCharType(), "test
description."),
+ new DataField(1, "col_1", new BigIntType(), "test
description."),
+ new DataField(2, "col_2_1", new BigIntType(), "test
description 2."),
+ new DataField(3, "col_3", new VarCharType(),
"Someone's desc."),
+ new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
+ new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
+ new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
+ new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
+ new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
+ new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
+ new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
+ new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
+ new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
+ new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
+ new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
+ new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
+ new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
+ new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
+ new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
+ new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
+ new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
+ return Arrays.asList(upField1, upField2, upField3, upField4, upField5);
+ }
+
+ private FileStoreTable table;
+ private String tableName = "MyTable";
+
+ @BeforeEach
+ public void before() throws Exception {
+ FileIO fileIO = LocalFileIO.create();
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, tableName));
+ Schema schema =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt1", DataTypes.INT())
+ .column("pt2", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt1", "pt2")
+ .primaryKey("pk", "pt1", "pt2")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .option(CoreOptions.BUCKET.key(), "2")
+ .option(CoreOptions.SEQUENCE_FIELD.key(), "col1")
+ .build();
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath),
schema);
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
+ }
+
+ @Test
+ public void testSchemaEvolution() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<List<DataField>> upDataFieldStream =
env.fromCollection(prepareData());
+ Options options = new Options();
+ options.set("warehouse", tempPath.toString());
+ final Catalog.Loader catalogLoader = () ->
FlinkCatalogFactory.createPaimonCatalog(options);
+ Identifier identifier = Identifier.create(database, tableName);
+ DataStream<Void> schemaChangeProcessFunction =
+ upDataFieldStream
+ .process(
+ new UpdatedDataFieldsProcessFunction(
+ new SchemaManager(table.fileIO(),
table.location()),
+ identifier,
+ catalogLoader))
+ .name("Schema Evolution");
+ schemaChangeProcessFunction.getTransformation().setParallelism(1);
+ schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+ env.execute();
+ }
+}