This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc123e18b [cdc] Fix database sync performance issue of schema 
evolution (#5382)
1fc123e18b is described below

commit 1fc123e18bf1160e41f7afb70763968fc77d0497
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 7 17:38:19 2025 +0800

    [cdc] Fix database sync performance issue of schema evolution (#5382)
---
 .../org/apache/paimon/types/FieldIdentifier.java   |  6 ++--
 ...MultiTableUpdatedDataFieldsProcessFunction.java | 39 ++++++++++++++++++----
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java | 27 ++-------------
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  | 17 ++++++++++
 4 files changed, 56 insertions(+), 33 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java 
b/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
index 7e9ced7cf9..d71250071b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
@@ -22,9 +22,9 @@ import java.util.Objects;
 
 /** Used to indicate the uniqueness of a field. */
 public class FieldIdentifier {
-    private String name;
-    private DataType type;
-    private String description;
+    private final String name;
+    private final DataType type;
+    private final String description;
 
     public FieldIdentifier(DataField dataField) {
         this.name = dataField.name();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index cc8118f6ca..42effccf7f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -25,6 +25,8 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.FieldIdentifier;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -33,8 +35,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a {@link
@@ -51,6 +56,8 @@ public class MultiTableUpdatedDataFieldsProcessFunction
 
     private final Map<Identifier, SchemaManager> schemaManagers = new 
HashMap<>();
 
+    private final Map<Identifier, Set<FieldIdentifier>> latestFieldsMap = new 
HashMap<>();
+
     public MultiTableUpdatedDataFieldsProcessFunction(
             CatalogLoader catalogLoader, TypeMapping typeMapping) {
         super(catalogLoader, typeMapping);
@@ -73,14 +80,34 @@ public class MultiTableUpdatedDataFieldsProcessFunction
                             }
                             return new SchemaManager(table.fileIO(), 
table.location());
                         });
-
         if (Objects.isNull(schemaManager)) {
             LOG.error("Failed to get schema manager for table " + tableId);
-        } else {
-            for (SchemaChange schemaChange :
-                    extractSchemaChanges(schemaManager, updatedSchema.f1)) {
-                applySchemaChange(schemaManager, schemaChange, tableId);
-            }
+            return;
+        }
+
+        Set<FieldIdentifier> latestFields =
+                latestFieldsMap.computeIfAbsent(tableId, id -> new 
HashSet<>());
+        List<DataField> actualUpdatedDataFields =
+                actualUpdatedDataFields(updatedSchema.f1.fields(), 
latestFields);
+
+        if (actualUpdatedDataFields.isEmpty() && updatedSchema.f1.comment() == 
null) {
+            return;
+        }
+
+        CdcSchema actualUpdatedSchema =
+                new CdcSchema(
+                        actualUpdatedDataFields,
+                        updatedSchema.f1.primaryKeys(),
+                        updatedSchema.f1.comment());
+
+        for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, 
actualUpdatedSchema)) {
+            applySchemaChange(schemaManager, schemaChange, tableId);
         }
+        /*
+         * Here, actualUpdatedDataFields cannot be used to update latestFields 
because there is a
+         * non-SchemaChange.AddColumn scenario. Otherwise, the previously 
existing fields cannot be
+         * modified again.
+         */
+        latestFieldsMap.put(tableId, updateLatestFields(schemaManager));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 363d747d3b..e1b78926cc 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -25,17 +25,13 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.FieldIdentifier;
-import org.apache.paimon.types.RowType;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a {@link
@@ -68,12 +64,8 @@ public class UpdatedDataFieldsProcessFunction
     public void processElement(CdcSchema updatedSchema, Context context, 
Collector<Void> collector)
             throws Exception {
         List<DataField> actualUpdatedDataFields =
-                updatedSchema.fields().stream()
-                        .filter(
-                                dataField ->
-                                        !latestDataFieldContain(new 
FieldIdentifier(dataField)))
-                        .collect(Collectors.toList());
-        if (CollectionUtils.isEmpty(actualUpdatedDataFields) && 
updatedSchema.comment() == null) {
+                actualUpdatedDataFields(updatedSchema.fields(), latestFields);
+        if (actualUpdatedDataFields.isEmpty() && updatedSchema.comment() == 
null) {
             return;
         }
         CdcSchema actualUpdatedSchema =
@@ -89,19 +81,6 @@ public class UpdatedDataFieldsProcessFunction
          * non-SchemaChange.AddColumn scenario. Otherwise, the previously 
existing fields cannot be
          * modified again.
          */
-        updateLatestFields();
-    }
-
-    private boolean latestDataFieldContain(FieldIdentifier dataField) {
-        return latestFields.stream().anyMatch(previous -> 
Objects.equals(previous, dataField));
-    }
-
-    private void updateLatestFields() {
-        RowType oldRowType = schemaManager.latest().get().logicalRowType();
-        Set<FieldIdentifier> fieldIdentifiers =
-                oldRowType.getFields().stream()
-                        .map(item -> new FieldIdentifier(item))
-                        .collect(Collectors.toSet());
-        latestFields = fieldIdentifiers;
+        latestFields = updateLatestFields(schemaManager);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 657b025bf5..410425a7da 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.FieldIdentifier;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
@@ -44,6 +45,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Base class for update data fields process function. */
 public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends 
ProcessFunction<I, O> {
@@ -280,6 +283,20 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         return result;
     }
 
+    protected List<DataField> actualUpdatedDataFields(
+            List<DataField> newFields, Set<FieldIdentifier> latestFields) {
+        return newFields.stream()
+                .filter(dataField -> !latestFields.contains(new 
FieldIdentifier(dataField)))
+                .collect(Collectors.toList());
+    }
+
+    protected Set<FieldIdentifier> updateLatestFields(SchemaManager 
schemaManager) {
+        RowType oldRowType = schemaManager.latest().get().logicalRowType();
+        return oldRowType.getFields().stream()
+                .map(FieldIdentifier::new)
+                .collect(Collectors.toSet());
+    }
+
     @Override
     public void close() throws Exception {
         if (catalog != null) {

Reply via email to