This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 453902dd3 [INLONG-6300][Sort] Schema update policy unifie behavior for
all column change type (#6306)
453902dd3 is described below
commit 453902dd331ab742de2b0297a61b52d17ca1e64e
Author: thesumery <[email protected]>
AuthorDate: Thu Oct 27 20:04:11 2022 +0800
[INLONG-6300][Sort] Schema update policy unifie behavior for all column
change type (#6306)
---
.../org/apache/inlong/sort/base/Constants.java | 12 ++------
.../inlong/sort/base/sink/MultipleSinkOption.java | 32 ++++++----------------
.../base/sink/SchemaUpdateExceptionPolicy.java | 4 +++
.../sort/iceberg/FlinkDynamicTableFactory.java | 6 ++--
.../inlong/sort/iceberg/IcebergTableSink.java | 6 ++--
.../sink/multiple/DynamicSchemaHandleOperator.java | 22 +++++++--------
6 files changed, 30 insertions(+), 52 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 59c192bdd..fb542ef6d 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -152,15 +152,9 @@ public final class Constants {
.withDescription("The option 'sink.multiple.enable' "
+ "is used to determine whether to support
multiple sink writing, default is 'false'.");
- public static final ConfigOption<SchemaUpdateExceptionPolicy>
SINK_MULTIPLE_ADD_COLUMN_POLICY =
- ConfigOptions.key("sink.multiple.add-column.policy")
+ public static final ConfigOption<SchemaUpdateExceptionPolicy>
SINK_MULTIPLE_SCHEMA_UPDATE_POLICY =
+ ConfigOptions.key("sink.multiple.schema-update.policy")
.enumType(SchemaUpdateExceptionPolicy.class)
.defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
- .withDescription("The action to deal with column add.");
-
- public static final ConfigOption<SchemaUpdateExceptionPolicy>
SINK_MULTIPLE_DEL_COLUMN_POLICY =
- ConfigOptions.key("sink.multiple.del-column.policy")
- .enumType(SchemaUpdateExceptionPolicy.class)
- .defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
- .withDescription("The action to deal with column delete.");
+ .withDescription("The action to deal with schema update in
multiple sink.");
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index bf966e1e1..77c924b95 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -37,22 +37,18 @@ public class MultipleSinkOption implements Serializable {
private String format;
- private SchemaUpdateExceptionPolicy addColumnPolicy;
-
- private SchemaUpdateExceptionPolicy delColumnPolicy;
+ private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private String databasePattern;
private String tablePattern;
public MultipleSinkOption(String format,
- SchemaUpdateExceptionPolicy addColumnPolicy,
- SchemaUpdateExceptionPolicy delColumnPolicy,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy,
String databasePattern,
String tablePattern) {
this.format = format;
- this.addColumnPolicy = addColumnPolicy;
- this.delColumnPolicy = delColumnPolicy;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
}
@@ -61,12 +57,8 @@ public class MultipleSinkOption implements Serializable {
return format;
}
- public SchemaUpdateExceptionPolicy getAddColumnPolicy() {
- return addColumnPolicy;
- }
-
- public SchemaUpdateExceptionPolicy getDelColumnPolicy() {
- return delColumnPolicy;
+ public SchemaUpdateExceptionPolicy getSchemaUpdatePolicy() {
+ return schemaUpdatePolicy;
}
public String getDatabasePattern() {
@@ -83,8 +75,7 @@ public class MultipleSinkOption implements Serializable {
public static class Builder {
private String format;
- private SchemaUpdateExceptionPolicy addColumnPolicy;
- private SchemaUpdateExceptionPolicy delColumnPolicy;
+ private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private String databasePattern;
private String tablePattern;
@@ -93,13 +84,8 @@ public class MultipleSinkOption implements Serializable {
return this;
}
- public MultipleSinkOption.Builder
withAddColumnPolicy(SchemaUpdateExceptionPolicy addColumnPolicy) {
- this.addColumnPolicy = addColumnPolicy;
- return this;
- }
-
- public MultipleSinkOption.Builder
withDelColumnPolicy(SchemaUpdateExceptionPolicy delColumnPolicy) {
- this.delColumnPolicy = delColumnPolicy;
+ public MultipleSinkOption.Builder
withSchemaUpdatePolicy(SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
return this;
}
@@ -114,7 +100,7 @@ public class MultipleSinkOption implements Serializable {
}
public MultipleSinkOption build() {
- return new MultipleSinkOption(format, addColumnPolicy,
delColumnPolicy, databasePattern, tablePattern);
+ return new MultipleSinkOption(format, schemaUpdatePolicy,
databasePattern, tablePattern);
}
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
index 74cd271a6..7da5ac772 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
@@ -39,4 +39,8 @@ public enum SchemaUpdateExceptionPolicy {
SchemaUpdateExceptionPolicy(String description) {
this.description = description;
}
+
+ public String getDescription() {
+ return description;
+ }
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 5d63d8dc9..4852bca33 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -50,11 +50,10 @@ import java.util.Set;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
-import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
@@ -241,8 +240,7 @@ public class FlinkDynamicTableFactory implements
DynamicTableSinkFactory, Dynami
options.add(SINK_MULTIPLE_FORMAT);
options.add(SINK_MULTIPLE_DATABASE_PATTERN);
options.add(SINK_MULTIPLE_TABLE_PATTERN);
- options.add(SINK_MULTIPLE_ADD_COLUMN_POLICY);
- options.add(SINK_MULTIPLE_DEL_COLUMN_POLICY);
+ options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
return options;
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 8296d02ff..ee7cf2c89 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -44,11 +44,10 @@ import java.util.Map;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
-import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
@@ -107,8 +106,7 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
.withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT))
.withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
.withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
-
.withAddColumnPolicy(tableOptions.get(SINK_MULTIPLE_ADD_COLUMN_POLICY))
-
.withDelColumnPolicy(tableOptions.get(SINK_MULTIPLE_DEL_COLUMN_POLICY))
+
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
.build())
.append();
} else {
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index ea69c12f9..e7fe68127 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -44,7 +44,6 @@ import
org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.base.sink.TableChange;
import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
-import org.apache.inlong.sort.base.sink.TableChange.DeleteColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,8 +58,6 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import static
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;
-
public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWithSchema>
implements OneInputStreamOperator<RowData, RecordWithSchema>,
ProcessingTimeCallback {
@@ -232,7 +229,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
Transaction transaction = table.newTransaction();
if (table.schema().sameSchema(oldSchema)) {
List<TableChange> tableChanges =
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
- if (canHandleWithSchemaUpdate(tableId, tableChanges)) {
+ if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
LOG.info("Schema evolution in table({}) for table change: {}",
tableId, tableChanges);
}
@@ -270,21 +267,22 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
return record;
}
- private boolean canHandleWithSchemaUpdate(TableIdentifier tableId,
List<TableChange> tableChanges) {
+ private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId,
List<TableChange> tableChanges) {
boolean canHandle = true;
for (TableChange tableChange : tableChanges) {
if (tableChange instanceof AddColumn) {
canHandle &=
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
- multipleSinkOption.getAddColumnPolicy());
- } else if (tableChange instanceof DeleteColumn) {
- canHandle &=
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
- multipleSinkOption.getDelColumnPolicy());
+ multipleSinkOption.getSchemaUpdatePolicy());
} else {
- canHandle &=
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
- LOG_WITH_IGNORE);
+ if
(MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+ multipleSinkOption.getSchemaUpdatePolicy())) {
+ LOG.info("Ignore table {} schema change: {} because
iceberg can't handle it.",
+ tableId, tableChange);
+ }
+ // todo:currently iceberg can only handle addColumn, so always
return false
+ canHandle = false;
}
}
-
if (!canHandle) {
blacklist.add(tableId);
}