This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0d7826760b [INLONG-9299][Sort] Iceberg support all migrate and auto
create table (#9306)
0d7826760b is described below
commit 0d7826760b23d4c58647e184c2d2dc46b2e41cf1
Author: Sting <[email protected]>
AuthorDate: Mon Nov 27 16:13:45 2023 +0800
[INLONG-9299][Sort] Iceberg support all migrate and auto create table
(#9306)
---
.../sort/protocol/constant/DorisConstant.java | 4 +
.../sort/protocol/node/load/IcebergLoadNode.java | 88 +++++++++++++++++++++-
.../apache/inlong/sort/util/SchemaChangeUtils.java | 4 +-
.../sink/multiple/DynamicSchemaHandleOperator.java | 6 +-
.../mysql/source/config/MySqlSourceOptions.java | 2 +-
5 files changed, 97 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java
index e9bea2ff68..2e186de803 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java
@@ -65,4 +65,8 @@ public class DorisConstant {
* The multiple table-pattern of sink
*/
public static final String SINK_MULTIPLE_TABLE_PATTERN =
"sink.multiple.table-pattern";
+ /**
+ * The schema change policies of sink
+ */
+ public static final String SINK_SCHEMA_CHANGE_POLICIES =
"sink.schema-change.policies";
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 7f76891dfd..a418fd930b 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -24,9 +24,13 @@ import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.constant.IcebergConstant;
import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
import com.google.common.base.Preconditions;
import lombok.Data;
@@ -43,8 +47,15 @@ import java.io.Serializable;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import static
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_DATABASE_PATTERN;
+import static
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE;
+import static
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN;
+import static
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES;
+
@JsonTypeName("icebergLoad")
@Data
@NoArgsConstructor
@@ -76,7 +87,30 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
@JsonProperty("appendMode")
private String appendMode;
- @JsonCreator
+ @Nullable
+ @JsonProperty("sinkMultipleEnable")
+ private Boolean sinkMultipleEnable = false;
+
+ @Nullable
+ @JsonProperty("sinkMultipleFormat")
+ private Format sinkMultipleFormat;
+
+ @Nullable
+ @JsonProperty("databasePattern")
+ private String databasePattern;
+
+ @Nullable
+ @JsonProperty("tablePattern")
+ private String tablePattern;
+
+ @Nullable
+ @JsonProperty("enableSchemaChange")
+ private boolean enableSchemaChange;
+
+ @Nullable
+ @JsonProperty("policyMap")
+ private Map<SchemaChangeType, SchemaChangePolicy> policyMap;
+
public IcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
@@ -92,14 +126,51 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
@JsonProperty("uri") String uri,
@JsonProperty("warehouse") String warehouse,
@JsonProperty("appendMode") String appendMode) {
+ this(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties, dbName, tableName,
+ primaryKey, catalogType, uri, warehouse, appendMode, false,
null,
+ null, null, false, null);
+ }
+
+ @JsonCreator
+ public IcebergLoadNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @Nonnull @JsonProperty("dbName") String dbName,
+ @Nonnull @JsonProperty("tableName") String tableName,
+ @JsonProperty("primaryKey") String primaryKey,
+ @JsonProperty("catalogType") IcebergConstant.CatalogType
catalogType,
+ @JsonProperty("uri") String uri,
+ @JsonProperty("warehouse") String warehouse,
+ @JsonProperty("appendMode") String appendMode,
+ @Nullable @JsonProperty(value = "sinkMultipleEnable", defaultValue
= "false") Boolean sinkMultipleEnable,
+ @Nullable @JsonProperty("sinkMultipleFormat") Format
sinkMultipleFormat,
+ @Nullable @JsonProperty("databasePattern") String databasePattern,
+ @Nullable @JsonProperty("tablePattern") String tablePattern,
+ @JsonProperty("enableSchemaChange") boolean enableSchemaChange,
+ @Nullable @JsonProperty("policyMap") Map<SchemaChangeType,
SchemaChangePolicy> policyMap) {
super(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties);
- this.tableName = Preconditions.checkNotNull(tableName, "table name is
null");
- this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
this.primaryKey = primaryKey;
this.catalogType = catalogType == null ? CatalogType.HIVE :
catalogType;
this.uri = uri;
this.warehouse = warehouse;
this.appendMode = appendMode;
+ this.sinkMultipleEnable = sinkMultipleEnable;
+ if (sinkMultipleEnable == null || !sinkMultipleEnable) {
+ this.tableName = Preconditions.checkNotNull(tableName, "table name
is null");
+ this.dbName = Preconditions.checkNotNull(dbName, "db name is
null");
+ } else {
+ this.databasePattern = Preconditions.checkNotNull(databasePattern,
"databasePattern is null");
+ this.tablePattern = Preconditions.checkNotNull(tablePattern,
"tablePattern is null");
+ this.sinkMultipleFormat =
Preconditions.checkNotNull(sinkMultipleFormat,
+ "sinkMultipleFormat is null");
+ }
+ this.enableSchemaChange = enableSchemaChange;
+ this.policyMap = policyMap;
}
@Override
@@ -120,6 +191,17 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
if (null != warehouse) {
options.put(IcebergConstant.WAREHOUSE_KEY, warehouse);
}
+
+ if (sinkMultipleEnable != null && sinkMultipleEnable) {
+ options.put(SINK_MULTIPLE_ENABLE, sinkMultipleEnable.toString());
+ options.put(SINK_MULTIPLE_FORMAT,
Objects.requireNonNull(sinkMultipleFormat).identifier());
+ options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
+ options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
+ options.put(SINK_SCHEMA_CHANGE_POLICIES,
SchemaChangeUtils.serialize(policyMap));
+ } else {
+ options.put(SINK_MULTIPLE_ENABLE, "false");
+ }
+
return options;
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
index c6ffac7da0..2ef8187022 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
@@ -28,6 +28,7 @@ import org.apache.inlong.sort.schema.TableChange;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -286,8 +287,9 @@ public class SchemaChangeUtils {
for (String colName : intersectColSet) {
ColumnSchema oldCol = oldColumnSchemas.get(colName);
ColumnSchema newCol = newColumnSchemas.get(colName);
+
if (!oldCol.getType().equals(newCol.getType())
- || !oldCol.getComment().equals(newCol.getComment())) {
+ || !StringUtils.equals(oldCol.getComment(),
newCol.getComment())) {
tableChanges.add(
new TableChange.UpdateColumn(
new String[]{newCol.getName()},
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 6ae3a3a28b..e416cfd24f 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -512,8 +512,10 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
String.format("Unsupported table %s schema change:
%s.", tableId.toString(), tableChange));
}
}
-
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(),
tableChanges);
- LOG.info("Schema evolution in table({}) for table change: {}",
tableId, tableChanges);
+ if (!tableChanges.isEmpty()) {
+
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(),
tableChanges);
+ LOG.info("Schema evolution in table({}) for table change: {}",
tableId, tableChanges);
+ }
}
transaction.commitTransaction();
handleSchemaInfoEvent(tableId, table.schema());
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index 8faabdca41..b806028cbd 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -210,7 +210,7 @@ public class MySqlSourceOptions {
public static final ConfigOption<Boolean> INCLUDE_INCREMENTAL =
ConfigOptions.key("include-incremental")
.booleanType()
- .defaultValue(false)
+ .defaultValue(true)
.withDescription("Whether include a incremental flag in
data "
+ "when migrating all databases");