This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 32e27f1b0 [INLONG-7453][Sort] Fix the blacklist of Iceberg connector
will lose the metric and archiving of dirty data (#7454)
32e27f1b0 is described below
commit 32e27f1b08b797244ff33ccf620159e59e68cf33
Author: LinChen <[email protected]>
AuthorDate: Wed Mar 1 14:24:41 2023 +0800
[INLONG-7453][Sort] Fix the blacklist of Iceberg connector will lose the
metric and archiving of dirty data (#7454)
---
.../iceberg/sink/multiple/DynamicSchemaHandleOperator.java | 13 +------------
1 file changed, 1 insertion(+), 12 deletions(-)
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 53fef8dd1..bc2892163 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -65,12 +65,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -101,9 +99,6 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
// schema cache
private transient Map<TableIdentifier, Schema> schemaCache;
- // blacklist to filter schema update failed table
- private transient Set<TableIdentifier> blacklist;
-
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
@@ -142,7 +137,6 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
this.recordQueues = new HashMap<>();
this.schemaCache = new HashMap<>();
- this.blacklist = new HashSet<>();
// Initialize metric
MetricOption metricOption = MetricOption.builder()
@@ -186,9 +180,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
handleDirtyData(jsonNode, jsonNode,
DirtyType.TABLE_IDENTIFIER_PARSE_ERROR,
e, TableIdentifier.of("unknow", "unknow"));
}
- if (blacklist.contains(tableId)) {
- return;
- }
+
boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
if (isDDL) {
execDDL(jsonNode, tableId);
@@ -242,7 +234,6 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
@Override
public void onProcessingTime(long timestamp) {
- LOG.info("Black list table: {} at time {}.", blacklist, timestamp);
processingTimeService.registerTimer(
processingTimeService.getCurrentProcessingTime() +
HELPER_DEBUG_INTERVEL, this);
}
@@ -313,7 +304,6 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
} catch (Exception e) {
LOG.warn("Ignore table {} schema change, old:
{} new: {}.",
tableId, dataSchema, latestSchema, e);
- blacklist.add(tableId);
handleDirtyData(jsonNode, jsonNode,
DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
}
return Collections.emptyList();
@@ -418,7 +408,6 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
canHandle = false;
}
if (!canHandle) {
- blacklist.add(tableId);
break;
}
}