This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 116309cdd3 [hotfix] Refactor codes to topLevelField name in 
UpdatedDataFieldsProcessFunctionBase
116309cdd3 is described below

commit 116309cdd34dff641558764fbf00ac0c598389c3
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jun 9 16:13:07 2025 +0800

    [hotfix] Refactor codes to topLevelField name in 
UpdatedDataFieldsProcessFunctionBase
---
 ...MultiTableUpdatedDataFieldsProcessFunction.java |  1 -
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  | 59 +++++++++-------------
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  1 -
 3 files changed, 23 insertions(+), 38 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 503f3593c2..d10f6d9f5e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -99,7 +99,6 @@ public class MultiTableUpdatedDataFieldsProcessFunction
                         actualUpdatedDataFields,
                         updatedSchema.f1.primaryKeys(),
                         updatedSchema.f1.comment());
-
         for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, 
actualUpdatedSchema)) {
             applySchemaChange(schemaManager, schemaChange, tableId, 
actualUpdatedSchema);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index bb49de2ac1..f512c14e6c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -35,7 +35,6 @@ import org.apache.paimon.types.FieldIdentifier;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.api.common.functions.OpenContext;
@@ -46,6 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,13 +54,15 @@ import java.util.stream.Collectors;
 
 /** Base class for update data fields process function. */
 public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends 
ProcessFunction<I, O> {
+
     private static final Logger LOG =
             
LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
 
     protected final CatalogLoader catalogLoader;
+    private final TypeMapping typeMapping;
+
     protected Catalog catalog;
     private boolean caseSensitive;
-    private TypeMapping typeMapping;
 
     private static final List<DataTypeRoot> STRING_TYPES =
             Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
@@ -105,7 +107,7 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
             SchemaManager schemaManager,
             SchemaChange schemaChange,
             Identifier identifier,
-            CdcSchema actualUpdatedSchema)
+            CdcSchema newSchema)
             throws Exception {
         if (schemaChange instanceof SchemaChange.AddColumn) {
             try {
@@ -125,35 +127,17 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
             SchemaChange.UpdateColumnType updateColumnType =
                     (SchemaChange.UpdateColumnType) schemaChange;
-            TableSchema schema =
-                    schemaManager
-                            .latest()
-                            .orElseThrow(
-                                    () ->
-                                            new RuntimeException(
-                                                    "Table does not exist. 
This is unexpected."));
-            int idx = 
schema.fieldNames().indexOf(updateColumnType.fieldNames()[0]);
-            Preconditions.checkState(
-                    idx >= 0,
-                    "Field name "
-                            + updateColumnType.fieldNames()[0]
-                            + " does not exist in table. This is unexpected.");
-            DataType oldType = schema.fields().get(idx).type();
-            DataType newType = updateColumnType.newDataType();
-
-            // For complex types, extract the full new type from 
actualUpdatedSchema
-            // to preserve type context (e.g., ARRAY<BIGINT> instead of just 
BIGINT)
-            if (actualUpdatedSchema != null) {
-                String fieldName = updateColumnType.fieldNames()[0];
-                for (DataField field : actualUpdatedSchema.fields()) {
-                    if (fieldName.equals(field.name())) {
-                        newType = field.type();
-                        break;
-                    }
-                }
-            }
-
-            switch (canConvert(oldType, newType, typeMapping)) {
+            String topLevelFieldName = updateColumnType.fieldNames()[0];
+            TableSchema oldSchema =
+                    schemaManager.latestOrThrow("Table does not exist. This is 
unexpected.");
+            DataType oldTopLevelFieldType =
+                    new 
RowType(oldSchema.fields()).getField(topLevelFieldName).type();
+            DataType newTopLevelFieldType =
+                    new 
RowType(newSchema.fields()).getField(topLevelFieldName).type();
+
+            // For complex types, extract the top level type to check type 
context (e.g.,
+            // ARRAY<BIGINT> instead of just BIGINT)
+            switch (canConvert(oldTopLevelFieldType, newTopLevelFieldType, 
typeMapping)) {
                 case CONVERT:
                     catalog.alterTable(identifier, schemaChange, false);
                     break;
@@ -161,9 +145,9 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                     throw new UnsupportedOperationException(
                             String.format(
                                     "Cannot convert field %s from type %s to 
%s of Paimon table %s.",
-                                    updateColumnType.fieldNames()[0],
-                                    oldType,
-                                    newType,
+                                    topLevelFieldName,
+                                    oldTopLevelFieldType,
+                                    newTopLevelFieldType,
                                     identifier.getFullName()));
             }
         } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
@@ -391,7 +375,10 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                     }
                     // Generate nested column updates if needed
                     NestedSchemaUtils.generateNestedColumnUpdates(
-                            Arrays.asList(newFieldName), oldField.type(), 
newField.type(), result);
+                            Collections.singletonList(newFieldName),
+                            oldField.type(),
+                            newField.type(),
+                            result);
                     // update column comment
                     if (newField.description() != null) {
                         result.add(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index d791d33d89..e7275a9470 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -669,7 +669,6 @@ public class FlinkCatalog extends AbstractCatalog {
             org.apache.paimon.types.DataType oldType,
             org.apache.paimon.types.DataType newType,
             List<SchemaChange> schemaChanges) {
-
         NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
     }
 

Reply via email to