This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a3d9d597b [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop 
cause (#6407)
a3d9d597b is described below

commit a3d9d597b3787a1d42ac79091cc3ae2007b751c9
Author: thesumery <[email protected]>
AuthorDate: Sat Nov 5 15:57:58 2022 +0800

    [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407)
    
    Co-authored-by: thesumery <[email protected]>
---
 .../sink/multiple/DynamicSchemaHandleOperator.java | 30 +++++++++++-----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index e7fe68127..a0a9092c6 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -229,10 +229,12 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         Transaction transaction = table.newTransaction();
         if (table.schema().sameSchema(oldSchema)) {
             List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
-            if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
-                
SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
-                LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
+            if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
+                // If can not handle this schema update, should not push data 
into next operator
+                return;
             }
+            SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
+            LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
         }
         transaction.commitTransaction();
         handleSchemaInfoEvent(tableId, table.schema());
@@ -270,22 +272,20 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, 
List<TableChange> tableChanges) {
         boolean canHandle = true;
         for (TableChange tableChange : tableChanges) {
-            if (tableChange instanceof AddColumn) {
-                canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-                        multipleSinkOption.getSchemaUpdatePolicy());
-            } else {
-                if 
(MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-                        multipleSinkOption.getSchemaUpdatePolicy())) {
-                    LOG.info("Ignore table {} schema change: {} because 
iceberg can't handle it.",
-                            tableId, tableChange);
-                }
+            canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+                    multipleSinkOption.getSchemaUpdatePolicy());
+            if (!(tableChange instanceof AddColumn)) {
                 // todo:currently iceberg can only handle addColumn, so always 
return false
+                LOG.info("Ignore table {} schema change: {} because iceberg 
can't handle it.",
+                        tableId, tableChange);
                 canHandle = false;
             }
+            if (!canHandle) {
+                blacklist.add(tableId);
+                break;
+            }
         }
-        if (!canHandle) {
-            blacklist.add(tableId);
-        }
+
         return canHandle;
     }
 }

Reply via email to