This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1fc123e18b [cdc] Fix database sync performance issue of schema
evolution (#5382)
1fc123e18b is described below
commit 1fc123e18bf1160e41f7afb70763968fc77d0497
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 7 17:38:19 2025 +0800
[cdc] Fix database sync performance issue of schema evolution (#5382)
---
.../org/apache/paimon/types/FieldIdentifier.java | 6 ++--
...MultiTableUpdatedDataFieldsProcessFunction.java | 39 ++++++++++++++++++----
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 27 ++-------------
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 17 ++++++++++
4 files changed, 56 insertions(+), 33 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
b/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
index 7e9ced7cf9..d71250071b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java
@@ -22,9 +22,9 @@ import java.util.Objects;
/** Used to indicate the uniqueness of a field. */
public class FieldIdentifier {
- private String name;
- private DataType type;
- private String description;
+ private final String name;
+ private final DataType type;
+ private final String description;
public FieldIdentifier(DataField dataField) {
this.name = dataField.name();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index cc8118f6ca..42effccf7f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -25,6 +25,8 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.FieldIdentifier;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -33,8 +35,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
/**
* A {@link ProcessFunction} to handle schema changes. New schema is
represented by a {@link
@@ -51,6 +56,8 @@ public class MultiTableUpdatedDataFieldsProcessFunction
private final Map<Identifier, SchemaManager> schemaManagers = new
HashMap<>();
+ private final Map<Identifier, Set<FieldIdentifier>> latestFieldsMap = new
HashMap<>();
+
public MultiTableUpdatedDataFieldsProcessFunction(
CatalogLoader catalogLoader, TypeMapping typeMapping) {
super(catalogLoader, typeMapping);
@@ -73,14 +80,34 @@ public class MultiTableUpdatedDataFieldsProcessFunction
}
return new SchemaManager(table.fileIO(),
table.location());
});
-
if (Objects.isNull(schemaManager)) {
LOG.error("Failed to get schema manager for table " + tableId);
- } else {
- for (SchemaChange schemaChange :
- extractSchemaChanges(schemaManager, updatedSchema.f1)) {
- applySchemaChange(schemaManager, schemaChange, tableId);
- }
+ return;
+ }
+
+ Set<FieldIdentifier> latestFields =
+ latestFieldsMap.computeIfAbsent(tableId, id -> new
HashSet<>());
+ List<DataField> actualUpdatedDataFields =
+ actualUpdatedDataFields(updatedSchema.f1.fields(),
latestFields);
+
+ if (actualUpdatedDataFields.isEmpty() && updatedSchema.f1.comment() ==
null) {
+ return;
+ }
+
+ CdcSchema actualUpdatedSchema =
+ new CdcSchema(
+ actualUpdatedDataFields,
+ updatedSchema.f1.primaryKeys(),
+ updatedSchema.f1.comment());
+
+ for (SchemaChange schemaChange : extractSchemaChanges(schemaManager,
actualUpdatedSchema)) {
+ applySchemaChange(schemaManager, schemaChange, tableId);
}
+ /*
+ * Here, actualUpdatedDataFields cannot be used to update latestFields
because there is a
+ * non-SchemaChange.AddColumn scenario. Otherwise, the previously
existing fields cannot be
+ * modified again.
+ */
+ latestFieldsMap.put(tableId, updateLatestFields(schemaManager));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 363d747d3b..e1b78926cc 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -25,17 +25,13 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.FieldIdentifier;
-import org.apache.paimon.types.RowType;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* A {@link ProcessFunction} to handle schema changes. New schema is
represented by a {@link
@@ -68,12 +64,8 @@ public class UpdatedDataFieldsProcessFunction
public void processElement(CdcSchema updatedSchema, Context context,
Collector<Void> collector)
throws Exception {
List<DataField> actualUpdatedDataFields =
- updatedSchema.fields().stream()
- .filter(
- dataField ->
- !latestDataFieldContain(new
FieldIdentifier(dataField)))
- .collect(Collectors.toList());
- if (CollectionUtils.isEmpty(actualUpdatedDataFields) &&
updatedSchema.comment() == null) {
+ actualUpdatedDataFields(updatedSchema.fields(), latestFields);
+ if (actualUpdatedDataFields.isEmpty() && updatedSchema.comment() ==
null) {
return;
}
CdcSchema actualUpdatedSchema =
@@ -89,19 +81,6 @@ public class UpdatedDataFieldsProcessFunction
* non-SchemaChange.AddColumn scenario. Otherwise, the previously
existing fields cannot be
* modified again.
*/
- updateLatestFields();
- }
-
- private boolean latestDataFieldContain(FieldIdentifier dataField) {
- return latestFields.stream().anyMatch(previous ->
Objects.equals(previous, dataField));
- }
-
- private void updateLatestFields() {
- RowType oldRowType = schemaManager.latest().get().logicalRowType();
- Set<FieldIdentifier> fieldIdentifiers =
- oldRowType.getFields().stream()
- .map(item -> new FieldIdentifier(item))
- .collect(Collectors.toSet());
- latestFields = fieldIdentifiers;
+ latestFields = updateLatestFields(schemaManager);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 657b025bf5..410425a7da 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.FieldIdentifier;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -44,6 +45,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Base class for update data fields process function. */
public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends
ProcessFunction<I, O> {
@@ -280,6 +283,20 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
return result;
}
+ protected List<DataField> actualUpdatedDataFields(
+ List<DataField> newFields, Set<FieldIdentifier> latestFields) {
+ return newFields.stream()
+ .filter(dataField -> !latestFields.contains(new
FieldIdentifier(dataField)))
+ .collect(Collectors.toList());
+ }
+
+ protected Set<FieldIdentifier> updateLatestFields(SchemaManager
schemaManager) {
+ RowType oldRowType = schemaManager.latest().get().logicalRowType();
+ return oldRowType.getFields().stream()
+ .map(FieldIdentifier::new)
+ .collect(Collectors.toSet());
+ }
+
@Override
public void close() throws Exception {
if (catalog != null) {