JingsongLi commented on code in PR #4535:
URL: https://github.com/apache/paimon/pull/4535#discussion_r1857677336


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java:
##########
@@ -43,19 +50,51 @@ public class UpdatedDataFieldsProcessFunction
 
     private final Identifier identifier;
 
+    private Set<FieldIdentifier> latestFields;
+
     public UpdatedDataFieldsProcessFunction(
             SchemaManager schemaManager, Identifier identifier, Catalog.Loader 
catalogLoader) {
         super(catalogLoader);
         this.schemaManager = schemaManager;
         this.identifier = identifier;
+        this.latestFields = new HashSet<>();
     }
 
     @Override
     public void processElement(
             List<DataField> updatedDataFields, Context context, 
Collector<Void> collector)
             throws Exception {
-        for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, 
updatedDataFields)) {
+        List<DataField> actualUpdatedDataFields =
+                updatedDataFields.stream()
+                        .filter(
+                                dataField ->
+                                        !latestDataFieldContain(new 
FieldIdentifier(dataField)))
+                        .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(actualUpdatedDataFields)) {
+            return;
+        }
+        for (SchemaChange schemaChange :
+                extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
             applySchemaChange(schemaManager, schemaChange, identifier);
         }
+        /**
+         * Here, actualUpdatedDataFields cannot be used to update latestFields 
because there is a
+         * non-SchemaChange.AddColumn scenario. Otherwise, the previously 
existing fields cannot be
+         * modified again.
+         */
+        updateLatestFields();

Review Comment:
   Can we just add updatedDataFields to latestFields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to