This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a3d9d597b [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop
cause (#6407)
a3d9d597b is described below
commit a3d9d597b3787a1d42ac79091cc3ae2007b751c9
Author: thesumery <[email protected]>
AuthorDate: Sat Nov 5 15:57:58 2022 +0800
[INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407)
Co-authored-by: thesumery <[email protected]>
---
.../sink/multiple/DynamicSchemaHandleOperator.java | 30 +++++++++++-----------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index e7fe68127..a0a9092c6 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -229,10 +229,12 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
Transaction transaction = table.newTransaction();
if (table.schema().sameSchema(oldSchema)) {
List<TableChange> tableChanges =
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
- if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
-
SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
- LOG.info("Schema evolution in table({}) for table change: {}",
tableId, tableChanges);
+ if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
+ // If can not handle this schema update, should not push data
into next operator
+ return;
}
+ SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(),
tableChanges);
+ LOG.info("Schema evolution in table({}) for table change: {}",
tableId, tableChanges);
}
transaction.commitTransaction();
handleSchemaInfoEvent(tableId, table.schema());
@@ -270,22 +272,20 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId,
List<TableChange> tableChanges) {
boolean canHandle = true;
for (TableChange tableChange : tableChanges) {
- if (tableChange instanceof AddColumn) {
- canHandle &=
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
- multipleSinkOption.getSchemaUpdatePolicy());
- } else {
- if
(MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
- multipleSinkOption.getSchemaUpdatePolicy())) {
- LOG.info("Ignore table {} schema change: {} because
iceberg can't handle it.",
- tableId, tableChange);
- }
+ canHandle &=
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+ multipleSinkOption.getSchemaUpdatePolicy());
+ if (!(tableChange instanceof AddColumn)) {
// todo:currently iceberg can only handle addColumn, so always
return false
+ LOG.info("Ignore table {} schema change: {} because iceberg
can't handle it.",
+ tableId, tableChange);
canHandle = false;
}
+ if (!canHandle) {
+ blacklist.add(tableId);
+ break;
+ }
}
- if (!canHandle) {
- blacklist.add(tableId);
- }
+
return canHandle;
}
}