This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e27f1b0 [INLONG-7453][Sort] Fix the blacklist of Iceberg connector 
will lose the metric and archiving of dirty data (#7454)
32e27f1b0 is described below

commit 32e27f1b08b797244ff33ccf620159e59e68cf33
Author: LinChen <[email protected]>
AuthorDate: Wed Mar 1 14:24:41 2023 +0800

    [INLONG-7453][Sort] Fix the blacklist of Iceberg connector will lose the 
metric and archiving of dirty data (#7454)
---
 .../iceberg/sink/multiple/DynamicSchemaHandleOperator.java  | 13 +------------
 1 file changed, 1 insertion(+), 12 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 53fef8dd1..bc2892163 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -65,12 +65,10 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -101,9 +99,6 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     // schema cache
     private transient Map<TableIdentifier, Schema> schemaCache;
 
-    // blacklist to filter schema update failed table
-    private transient Set<TableIdentifier> blacklist;
-
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
 
@@ -142,7 +137,6 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
 
         this.recordQueues = new HashMap<>();
         this.schemaCache = new HashMap<>();
-        this.blacklist = new HashSet<>();
 
         // Initialize metric
         MetricOption metricOption = MetricOption.builder()
@@ -186,9 +180,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
             handleDirtyData(jsonNode, jsonNode, 
DirtyType.TABLE_IDENTIFIER_PARSE_ERROR,
                     e, TableIdentifier.of("unknow", "unknow"));
         }
-        if (blacklist.contains(tableId)) {
-            return;
-        }
+
         boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
         if (isDDL) {
             execDDL(jsonNode, tableId);
@@ -242,7 +234,6 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
 
     @Override
     public void onProcessingTime(long timestamp) {
-        LOG.info("Black list table: {} at time {}.", blacklist, timestamp);
         processingTimeService.registerTimer(
                 processingTimeService.getCurrentProcessingTime() + 
HELPER_DEBUG_INTERVEL, this);
     }
@@ -313,7 +304,6 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                             } catch (Exception e) {
                                 LOG.warn("Ignore table {} schema change, old: 
{} new: {}.",
                                         tableId, dataSchema, latestSchema, e);
-                                blacklist.add(tableId);
                                 handleDirtyData(jsonNode, jsonNode, 
DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
                             }
                             return Collections.emptyList();
@@ -418,7 +408,6 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                 canHandle = false;
             }
             if (!canHandle) {
-                blacklist.add(tableId);
                 break;
             }
         }

Reply via email to