This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 186e8a1325 [INLONG-7763][Sort] Support ddl change for doris (#7764)
186e8a1325 is described below
commit 186e8a1325a28c744b535230b7404c6eea979100
Author: yunqingmoswu <[email protected]>
AuthorDate: Mon Jul 17 10:53:45 2023 +0800
[INLONG-7763][Sort] Support ddl change for doris (#7764)
---
.../sort/protocol/node/load/DorisLoadNode.java | 42 +-
.../inlong/sort/base/dirty/DirtyOptions.java | 6 +
.../sort/base/format/JsonDynamicSchemaFormat.java | 2 +
.../sort-flink-v1.13/sort-connectors/doris/pom.xml | 18 +
.../inlong/sort/doris/http/HttpGetEntity.java | 40 ++
.../inlong/sort/doris/schema/OperationHelper.java | 308 +++++++++++++
.../sort/doris/schema/SchemaChangeHelper.java | 476 +++++++++++++++++++++
.../table/DorisDynamicSchemaOutputFormat.java | 42 +-
.../sort/doris/table/DorisDynamicTableFactory.java | 67 ++-
.../sort/doris/table/DorisDynamicTableSink.java | 17 +-
.../sort/doris/schema/OperationHelperTest.java | 256 +++++++++++
11 files changed, 1258 insertions(+), 16 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
index a3ca5dd338..3e3ac93553 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
@@ -21,10 +21,13 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
import org.apache.inlong.sort.protocol.constant.DorisConstant;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
import com.google.common.base.Preconditions;
import lombok.Data;
@@ -95,6 +98,11 @@ public class DorisLoadNode extends LoadNode implements
InlongMetric, Serializabl
@Nullable
@JsonProperty("tablePattern")
private String tablePattern;
+ @JsonProperty("enableSchemaChange")
+ private boolean enableSchemaChange;
+ @Nullable
+ @JsonProperty("policyMap")
+ private Map<SchemaChangeType, SchemaChangePolicy> policyMap;
public DorisLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@@ -114,7 +122,6 @@ public class DorisLoadNode extends LoadNode implements
InlongMetric, Serializabl
null, null);
}
- @JsonCreator
public DorisLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
@@ -132,6 +139,31 @@ public class DorisLoadNode extends LoadNode implements
InlongMetric, Serializabl
@Nullable @JsonProperty("sinkMultipleFormat") Format
sinkMultipleFormat,
@Nullable @JsonProperty("databasePattern") String databasePattern,
@Nullable @JsonProperty("tablePattern") String tablePattern) {
+ this(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties, feNodes, userName,
+ password, tableIdentifier, primaryKey, sinkMultipleEnable,
sinkMultipleFormat, databasePattern,
+ tablePattern, false, null);
+ }
+
+ @JsonCreator
+ public DorisLoadNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @Nonnull @JsonProperty("feNodes") String feNodes,
+ @Nonnull @JsonProperty("username") String userName,
+ @Nonnull @JsonProperty("password") String password,
+ @Nullable @JsonProperty("tableIdentifier") String tableIdentifier,
+ @JsonProperty("primaryKey") String primaryKey,
+ @Nullable @JsonProperty(value = "sinkMultipleEnable", defaultValue
= "false") Boolean sinkMultipleEnable,
+ @Nullable @JsonProperty("sinkMultipleFormat") Format
sinkMultipleFormat,
+ @Nullable @JsonProperty("databasePattern") String databasePattern,
+ @Nullable @JsonProperty("tablePattern") String tablePattern,
+ @JsonProperty("enableSchemaChange") boolean enableSchemaChange,
+ @Nullable @JsonProperty("policyMap") Map<SchemaChangeType,
SchemaChangePolicy> policyMap) {
super(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties);
this.feNodes = Preconditions.checkNotNull(feNodes, "feNodes is null");
this.userName = Preconditions.checkNotNull(userName, "username is
null");
@@ -146,6 +178,10 @@ public class DorisLoadNode extends LoadNode implements
InlongMetric, Serializabl
this.sinkMultipleFormat =
Preconditions.checkNotNull(sinkMultipleFormat,
"sinkMultipleFormat is null");
}
+ this.enableSchemaChange = enableSchemaChange;
+ this.policyMap = policyMap;
+ Preconditions.checkState(!enableSchemaChange || policyMap != null &&
!policyMap.isEmpty(),
+ "policyMap is empty when enableSchemaChange is 'true'");
}
@Override
@@ -160,6 +196,10 @@ public class DorisLoadNode extends LoadNode implements
InlongMetric, Serializabl
options.put(SINK_MULTIPLE_FORMAT,
Objects.requireNonNull(sinkMultipleFormat).identifier());
options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
+ if (enableSchemaChange) {
+ options.put("sink.schema-change.enable", "true");
+ options.put("sink.schema-change.policies",
SchemaChangeUtils.serialize(policyMap));
+ }
} else {
options.put(SINK_MULTIPLE_ENABLE, "false");
options.put(DorisConstant.TABLE_IDENTIFIER, tableIdentifier);
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
index 58cfe1f98a..791cba0d71 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.base.dirty;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
@@ -29,6 +31,7 @@ import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_ENABLE;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
/**
* Dirty common options
@@ -64,6 +67,9 @@ public class DirtyOptions implements Serializable {
*/
public static DirtyOptions fromConfig(ReadableConfig config) {
boolean ignoreDirty = config.get(DIRTY_IGNORE);
+ if (config.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY) ==
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+ ignoreDirty = true;
+ }
boolean enableDirtySink = config.get(DIRTY_SIDE_OUTPUT_ENABLE);
boolean ignoreSinkError = config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
String dirtyConnector =
config.getOptional(DIRTY_SIDE_OUTPUT_CONNECTOR).orElse(null);
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 51e08f8852..e8bef34d90 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -61,6 +61,8 @@ import static
org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_
@SuppressWarnings("LanguageDetectionInspection")
public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaFormat<JsonNode> {
+ public static final int DEFAULT_DECIMAL_PRECISION = 15;
+ public static final int DEFAULT_DECIMAL_SCALE = 5;
private static final Logger LOG =
LoggerFactory.getLogger(JsonDynamicSchemaFormat.class);
/**
* The first item of array
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml
index 5da87c7864..ed0463bea5 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml
@@ -42,6 +42,24 @@
<artifactId>flink-doris-connector-${flink.minor.version}_${flink.scala.binary.version}</artifactId>
<version>${flink.connector.doris.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-json-v1.13</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java
new file mode 100644
index 0000000000..217d37d3ab
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.http;
+
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+
+import java.net.URI;
+
+/**
+ * Http entity with get
+ */
+public class HttpGetEntity extends HttpEntityEnclosingRequestBase {
+
+ private final static String METHOD = "GET";
+
+ public HttpGetEntity(String uri) {
+ super();
+ setURI(URI.create(uri));
+ }
+
+ @Override
+ public String getMethod() {
+ return METHOD;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
new file mode 100644
index 0000000000..c00cf81d2d
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.schema;
+
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Types;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringJoiner;
+
+public class OperationHelper {
+
+ private static final String APOSTROPHE = "'";
+ private static final String DOUBLE_QUOTES = "\"";
+ private final JsonDynamicSchemaFormat dynamicSchemaFormat;
+ private final int VARCHAR_MAX_LENGTH = 65533;
+
+ private OperationHelper(JsonDynamicSchemaFormat dynamicSchemaFormat) {
+ this.dynamicSchemaFormat = dynamicSchemaFormat;
+ }
+
+ public static OperationHelper of(JsonDynamicSchemaFormat
dynamicSchemaFormat) {
+ return new OperationHelper(dynamicSchemaFormat);
+ }
+
+ private String convert2DorisType(int jdbcType, boolean isNullable,
List<String> precisions) {
+ String type = null;
+ switch (jdbcType) {
+ case Types.BOOLEAN:
+ case Types.DATE:
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ type =
dynamicSchemaFormat.sqlType2FlinkType(jdbcType).copy(isNullable).asSummaryString();
+ break;
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ case Types.INTEGER:
+ case Types.BIGINT:
+ if (precisions != null && !precisions.isEmpty()) {
+ type = String.format("%s(%s)%s",
dynamicSchemaFormat.sqlType2FlinkType(jdbcType).asSummaryString(),
+ StringUtils.join(precisions, ","), isNullable ? ""
: " NOT NULL");
+ } else {
+ type =
dynamicSchemaFormat.sqlType2FlinkType(jdbcType).copy(isNullable).asSummaryString();
+ }
+ break;
+ case Types.DECIMAL:
+ DecimalType decimalType = (DecimalType)
dynamicSchemaFormat.sqlType2FlinkType(jdbcType);
+ if (precisions != null && !precisions.isEmpty()) {
+ Preconditions.checkState(precisions.size() < 3,
+ "The length of precisions with DECIMAL must small
than 3");
+ int precision = Integer.parseInt(precisions.get(0));
+ int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE;
+ if (precisions.size() == 2) {
+ scale = Integer.parseInt(precisions.get(1));
+ }
+ decimalType = new DecimalType(isNullable, precision,
scale);
+ } else {
+ decimalType = new DecimalType(isNullable,
decimalType.getPrecision(), decimalType.getScale());
+ }
+ type = decimalType.asSummaryString();
+ break;
+ case Types.CHAR:
+ LogicalType charType =
dynamicSchemaFormat.sqlType2FlinkType(jdbcType);
+ if (precisions != null && !precisions.isEmpty()) {
+ Preconditions.checkState(precisions.size() == 1,
+ "The length of precisions with CHAR must be 1");
+ charType = new CharType(isNullable,
Integer.parseInt(precisions.get(0)));
+ } else {
+ charType = charType.copy(isNullable);
+ }
+ type = charType.asSerializableString();
+ break;
+ case Types.VARCHAR:
+ LogicalType varcharType =
dynamicSchemaFormat.sqlType2FlinkType(jdbcType);
+ if (precisions != null && !precisions.isEmpty()) {
+ Preconditions.checkState(precisions.size() == 1,
+ "The length of precisions with VARCHAR must be 1");
+ // Because the precision definition of varchar by Doris is
different from that of MySQL.
+ // The precision in MySQL is the number of characters,
while Doris is the number of bytes,
+ // and Chinese characters occupy 3 bytes, so the precision
multiplys by 3 here.
+ int precision =
Math.min(Integer.parseInt(precisions.get(0)) * 3, VARCHAR_MAX_LENGTH);
+ varcharType = new VarCharType(isNullable, precision);
+ } else {
+ varcharType = varcharType.copy(isNullable);
+ }
+ type = varcharType.asSerializableString();
+ break;
+ // The following types are not directly supported in doris,
+ // and can only be converted to compatible types as much as
possible
+ case Types.TIME:
+ case Types.TIME_WITH_TIMEZONE:
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.BLOB:
+ case Types.CLOB:
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARBINARY:
+ case Types.LONGVARCHAR:
+ case Types.ARRAY:
+ case Types.NCHAR:
+ case Types.NCLOB:
+ case Types.OTHER:
+ type = String.format("STRING%s", isNullable ? "" : " NOT
NULL");
+ break;
+ case Types.TIMESTAMP_WITH_TIMEZONE:
+ case Types.TIMESTAMP:
+ type = "DATETIME";
+ break;
+ case Types.REAL:
+ case Types.NUMERIC:
+ int precision =
JsonDynamicSchemaFormat.DEFAULT_DECIMAL_PRECISION;
+ int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE;
+ if (precisions != null && !precisions.isEmpty()) {
+ Preconditions.checkState(precisions.size() < 3,
+ "The length of precisions with NUMERIC must small
than 3");
+ precision = Integer.parseInt(precisions.get(0));
+ if (precisions.size() == 2) {
+ scale = Integer.parseInt(precisions.get(1));
+ }
+ }
+ decimalType = new DecimalType(isNullable, precision, scale);
+ type = decimalType.asSerializableString();
+ break;
+ case Types.BIT:
+ type = String.format("BOOLEAN %s", isNullable ? "" : " NOT
NULL");
+ break;
+ default:
+ type = String.format("STRING%s", isNullable ? "" : " NOT
NULL");
+ }
+ return type;
+ }
+
+ /**
+ * Build the statement of AddColumn
+ *
+ * @param alterColumns The list of AlterColumn
+ * @return A statement of AddColumn
+ */
+ public String buildAddColumnStatement(List<AlterColumn> alterColumns) {
+ Preconditions.checkState(alterColumns != null
+ && !alterColumns.isEmpty(), "Alter columns is empty");
+ Iterator<AlterColumn> iterator = alterColumns.iterator();
+ StringBuilder sb = new StringBuilder();
+ while (iterator.hasNext()) {
+ AlterColumn expression = iterator.next();
+ Preconditions.checkNotNull(expression.getNewColumn(), "New column
is null");
+ Column column = expression.getNewColumn();
+ Preconditions.checkState(column.getName() != null &&
!column.getName().trim().isEmpty(),
+ "The column name is blank");
+ sb.append("ADD COLUMN `").append(column.getName()).append("` ")
+
.append(convert2DorisType(expression.getNewColumn().getJdbcType(),
+ column.isNullable(), column.getDefinition()));
+ if (validDefaultValue(column.getDefaultValue())) {
+ sb.append(" DEFAULT ").append(quote(column.getDefaultValue()));
+ }
+ if (column.getComment() != null) {
+ sb.append(" COMMENT ").append(quote(column.getComment()));
+ }
+ if (column.getPosition() != null &&
column.getPosition().getPositionType() != null) {
+ if (column.getPosition().getPositionType() ==
PositionType.FIRST) {
+ sb.append(" FIRST");
+ } else if (column.getPosition().getPositionType() ==
PositionType.AFTER) {
+
Preconditions.checkState(column.getPosition().getColumnName() != null
+ &&
!column.getPosition().getColumnName().trim().isEmpty(),
+ "The column name of Position is empty");
+ sb.append(" AFTER
`").append(column.getPosition().getColumnName()).append("`");
+ }
+ }
+ if (iterator.hasNext()) {
+ sb.append(", ");
+ }
+ }
+ return sb.toString();
+ }
+
+ private String quote(String value) {
+ if (value == null) {
+ return "'null'";
+ }
+ if (!value.startsWith(APOSTROPHE) && !value.startsWith(DOUBLE_QUOTES))
{
+ return String.format("'%s'", value);
+ }
+ return value;
+ }
+
+ /**
+ * Build the statement of DropColumn
+ *
+ * @param alterColumns The list of AlterColumn
+ * @return A statement of DropColumn
+ */
+ public String buildDropColumnStatement(List<AlterColumn> alterColumns) {
+ Preconditions.checkState(alterColumns != null
+ && !alterColumns.isEmpty(), "Alter columns is empty");
+ Iterator<AlterColumn> iterator = alterColumns.iterator();
+ StringBuilder sb = new StringBuilder();
+ while (iterator.hasNext()) {
+ AlterColumn expression = iterator.next();
+ Preconditions.checkNotNull(expression.getOldColumn(), "Old column
is null");
+ Column column = expression.getOldColumn();
+ Preconditions.checkState(column.getName() != null &&
!column.getName().trim().isEmpty(),
+ "The column name is blank");
+ sb.append("DROP COLUMN `").append(column.getName()).append("`");
+ if (iterator.hasNext()) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Build common statement of alter
+ *
+ * @param database The database of Doris
+ * @param table The table of Doris
+ * @return A statement of Alter table
+ */
+ public String buildAlterStatementCommon(String database, String table) {
+ return "ALTER TABLE `" + database + "`.`" + table + "` ";
+ }
+
+ private boolean validDefaultValue(String defaultValue) {
+ return defaultValue != null && !defaultValue.trim().isEmpty() &&
!"NULL"
+ .equalsIgnoreCase(defaultValue);
+ }
+
+ /**
+ * Build the statement of CreateTable
+ *
+ * @param database The database of Doris
+ * @param table The table of Doris
+ * @param primaryKeys The primary key of Doris
+ * @param operation The Operation
+ * @return A statement of CreateTable
+ */
+ public String buildCreateTableStatement(String database, String table,
List<String> primaryKeys,
+ CreateTableOperation operation) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE IF NOT EXISTS
`").append(database).append("`.`").append(table).append("`(\n");
+ Preconditions.checkState(operation.getColumns() != null &&
!operation.getColumns().isEmpty(),
+ String.format("The columns of table: %s.%s is empty",
database, table));
+ Iterator<Column> iterator = operation.getColumns().iterator();
+ StringJoiner joiner = new StringJoiner(",");
+ while (iterator.hasNext()) {
+ Column column = iterator.next();
+ Preconditions.checkNotNull(column, "The column is null");
+ Preconditions.checkState(column.getName() != null &&
!column.getName().trim().isEmpty(),
+ "The column name is blank");
+ sb.append("\t`").append(column.getName()).append("`
").append(convert2DorisType(column.getJdbcType(),
+ column.isNullable(), column.getDefinition()));
+ if (validDefaultValue(column.getDefaultValue())) {
+ sb.append(" DEFAULT ").append(quote(column.getDefaultValue()));
+ }
+ if (column.getComment() != null) {
+ sb.append(" COMMENT ").append(quote(column.getComment()));
+ }
+ joiner.add(String.format("`%s`", column.getName()));
+ if (iterator.hasNext()) {
+ sb.append(",\n");
+ }
+ }
+ sb.append("\n)\n");
+ String model = "DUPLICATE";
+ if (primaryKeys != null && !primaryKeys.isEmpty()) {
+ model = "UNIQUE";
+ joiner = new StringJoiner(",");
+ for (String primaryKey : primaryKeys) {
+ joiner.add(String.format("`%s`", primaryKey));
+ }
+ }
+ String keys = joiner.toString();
+ sb.append(model).append(" KEY(").append(keys).append(")");
+ if (StringUtils.isNotBlank(operation.getComment())) {
+ sb.append("\nCOMMENT ").append(quote(operation.getComment()));
+ }
+ sb.append("\nDISTRIBUTED BY HASH(").append(keys).append(")");
+ // Add light schema change support for it if the version of doris is
greater than 1.2.0 or equals 1.2.0
+ sb.append("\nPROPERTIES (\n\t\"light_schema_change\" = \"true\"\n)");
+ return sb.toString();
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
new file mode 100644
index 0000000000..effae22e1d
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.schema;
+
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.schema.SchemaChangeHandleException;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.doris.http.HttpGetEntity;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.StringJoiner;
+
+/**
+ * Schema change helper
+ */
+public class SchemaChangeHelper {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaChangeHelper.class);
+
+ private static final String CHECK_LIGHT_SCHEMA_CHANGE_API =
"http://%s/api/enable_light_schema_change/%s/%s";
+ private static final String SCHEMA_CHANGE_API =
"http://%s/api/query/default_cluster/%s";
+ private static final String DORIS_HTTP_CALL_SUCCESS = "0";
+ private static final String CONTENT_TYPE_JSON = "application/json";
+ private final boolean schemaChange;
+ private final Map<SchemaChangeType, SchemaChangePolicy> policyMap;
+ private final DorisOptions options;
+ private final JsonDynamicSchemaFormat dynamicSchemaFormat;
+ private final String databasePattern;
+ private final String tablePattern;
+ private final int maxRetries;
+ private final OperationHelper operationHelper;
+ private final SchemaUpdateExceptionPolicy exceptionPolicy;
+ private final SinkTableMetricData metricData;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
+
+ private SchemaChangeHelper(JsonDynamicSchemaFormat dynamicSchemaFormat,
DorisOptions options, boolean schemaChange,
+ Map<SchemaChangeType, SchemaChangePolicy> policyMap, String
databasePattern, String tablePattern,
+ int maxRetries, SchemaUpdateExceptionPolicy exceptionPolicy,
+ SinkTableMetricData metricData, DirtySinkHelper<Object>
dirtySinkHelper) {
+ this.dynamicSchemaFormat =
Preconditions.checkNotNull(dynamicSchemaFormat, "dynamicSchemaFormat is null");
+ this.options = Preconditions.checkNotNull(options, "doris options is
null");
+ this.schemaChange = schemaChange;
+ this.policyMap = policyMap;
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.maxRetries = maxRetries;
+ this.exceptionPolicy = exceptionPolicy;
+ this.metricData = metricData;
+ this.dirtySinkHelper = dirtySinkHelper;
+ operationHelper = OperationHelper.of(dynamicSchemaFormat);
+ }
+
+ public static SchemaChangeHelper of(JsonDynamicSchemaFormat
dynamicSchemaFormat, DorisOptions options,
+ boolean schemaChange, Map<SchemaChangeType, SchemaChangePolicy>
policyMap, String databasePattern,
+ String tablePattern, int maxRetries, SchemaUpdateExceptionPolicy
exceptionPolicy,
+ SinkTableMetricData metricData, DirtySinkHelper<Object>
dirtySinkHelper) {
+ return new SchemaChangeHelper(dynamicSchemaFormat, options,
schemaChange, policyMap, databasePattern,
+ tablePattern, maxRetries, exceptionPolicy, metricData,
dirtySinkHelper);
+ }
+
+ /**
+ * Process schema change for Doris
+ *
+ * @param data The origin data
+ */
+ public void process(byte[] originData, JsonNode data) {
+ if (!schemaChange) {
+ return;
+ }
+ String database;
+ String table;
+ try {
+ database = dynamicSchemaFormat.parse(data, databasePattern);
+ table = dynamicSchemaFormat.parse(data, tablePattern);
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Parse database, table from origin data
failed, origin data: %s",
+ new String(originData)),
+ e);
+ }
+ LOGGER.warn("Parse database, table from origin data failed, origin
data: {}", new String(originData), e);
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+ dirtySinkHelper.invoke(new String(originData),
DirtyType.JSON_PROCESS_ERROR, e);
+ }
+ if (metricData != null) {
+ metricData.invokeDirty(1, originData.length);
+ }
+ return;
+ }
+ Operation operation;
+ try {
+ JsonNode operationNode =
Preconditions.checkNotNull(data.get("operation"),
+ "Operation node is null");
+ operation = Preconditions.checkNotNull(
+
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new
TypeReference<Operation>() {
+ }), "Operation is null");
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Extract Operation from origin data
failed,origin data: %s", data), e);
+ }
+ LOGGER.warn("Extract Operation from origin data failed,origin
data: {}", data, e);
+ handleDirtyData(data, originData, database, table,
DirtyType.JSON_PROCESS_ERROR, e);
+ return;
+ }
+ String originSchema = dynamicSchemaFormat.extractDDL(data);
+ SchemaChangeType type =
SchemaChangeUtils.extractSchemaChangeType(operation);
+ if (type == null) {
+ LOGGER.warn("Unsupported for schema-change: {}", originSchema);
+ return;
+ }
+ switch (type) {
+ case ALTER:
+ handleAlterOperation(database, table, originData,
originSchema, data, (AlterOperation) operation);
+ break;
+ case CREATE_TABLE:
+ doCreateTable(originData, database, table, type, originSchema,
data, (CreateTableOperation) operation);
+ break;
+ case DROP_TABLE:
+ doDropTable(type, originSchema);
+ break;
+ case RENAME_TABLE:
+ doRenameTable(type, originSchema);
+ break;
+ case TRUNCATE_TABLE:
+ doTruncateTable(type, originSchema);
+ break;
+ default:
+ LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+ }
+ }
+
+ private void handleDirtyData(JsonNode data, byte[] originData, String
database,
+ String table, DirtyType dirtyType, Throwable e) {
+ if (exceptionPolicy == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+ String label = parseValue(data,
dirtySinkHelper.getDirtyOptions().getLabels());
+ String logTag = parseValue(data,
dirtySinkHelper.getDirtyOptions().getLogTag());
+ String identifier = parseValue(data,
dirtySinkHelper.getDirtyOptions().getIdentifier());
+ dirtySinkHelper.invoke(new String(originData), dirtyType, label,
logTag, identifier, e);
+ }
+ if (metricData != null) {
+ metricData.outputDirtyMetricsWithEstimate(database, table, 1,
originData.length);
+ }
+ }
+
+ private void reportMetric(String database, String table, int len) {
+ if (metricData != null) {
+ metricData.outputMetrics(database, table, 1, len);
+ }
+ }
+
+ private String parseValue(JsonNode data, String pattern) {
+ try {
+ return dynamicSchemaFormat.parse(data, pattern);
+ } catch (Exception e) {
+ LOGGER.warn("Parse value from pattern failed,the pattern: {},
data: {}", pattern, data);
+ }
+ return pattern;
+ }
+
+ private void handleAlterOperation(String database, String table, byte[]
originData,
+ String originSchema, JsonNode data, AlterOperation operation) {
+ if (operation.getAlterColumns() == null ||
operation.getAlterColumns().isEmpty()) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Alter columns is empty, origin schema:
%s", originSchema));
+ }
+ LOGGER.warn("Alter columns is empty, origin schema: {}",
originSchema);
+ return;
+ }
+ Map<SchemaChangeType, List<AlterColumn>> typeMap = new
LinkedHashMap<>();
+ for (AlterColumn alterColumn : operation.getAlterColumns()) {
+ Set<SchemaChangeType> types = null;
+ try {
+ types = SchemaChangeUtils.extractSchemaChangeType(alterColumn);
+ Preconditions.checkState(!types.isEmpty(), "Schema change
types is empty");
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Extract schema change type failed,
origin schema: %s", originSchema), e);
+ }
+ LOGGER.warn("Extract schema change type failed, origin schema:
{}", originSchema, e);
+ }
+ if (types == null) {
+ continue;
+ }
+ if (types.size() == 1) {
+ SchemaChangeType type = types.stream().findFirst().get();
+ typeMap.computeIfAbsent(type, k -> new
ArrayList<>()).add(alterColumn);
+ } else {
+ // Handle change column, it only exists change column type and
rename column in this scenario for now.
+ for (SchemaChangeType type : types) {
+ SchemaChangePolicy policy = policyMap.get(type);
+ if (policy == SchemaChangePolicy.ENABLE) {
+ LOGGER.warn("Unsupported for {}: {}", type,
originSchema);
+ } else {
+ doSchemaChangeBase(type, policy, originSchema);
+ }
+ }
+ }
+ }
+ if (!typeMap.isEmpty()) {
+ doAlterOperation(database, table, originData, originSchema, data,
typeMap);
+ }
+ }
+
+ private void doAlterOperation(String database, String table, byte[]
originData, String originSchema, JsonNode data,
+ Map<SchemaChangeType, List<AlterColumn>> typeMap) {
+ StringJoiner joiner = new StringJoiner(",");
+ for (Entry<SchemaChangeType, List<AlterColumn>> kv :
typeMap.entrySet()) {
+ SchemaChangePolicy policy = policyMap.get(kv.getKey());
+ doSchemaChangeBase(kv.getKey(), policy, originSchema);
+ if (policy == SchemaChangePolicy.ENABLE) {
+ String alterStatement = null;
+ try {
+ switch (kv.getKey()) {
+ case ADD_COLUMN:
+ alterStatement = doAddColumn(kv.getValue());
+ break;
+ case DROP_COLUMN:
+ alterStatement = doDropColumn(kv.getValue());
+ break;
+ case RENAME_COLUMN:
+ alterStatement = doRenameColumn(kv.getKey(),
originSchema);
+ break;
+ case CHANGE_COLUMN_TYPE:
+ alterStatement = doChangeColumnType(kv.getKey(),
originSchema);
+ break;
+ default:
+ }
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Build alter statement failed,
origin schema: %s", originSchema), e);
+ }
+ LOGGER.warn("Build alter statement failed, origin schema:
{}", originSchema, e);
+ }
+ if (alterStatement != null) {
+ joiner.add(alterStatement);
+ }
+ }
+ }
+ String statement = joiner.toString();
+ if (statement.length() != 0) {
+ try {
+ String alterStatementCommon =
operationHelper.buildAlterStatementCommon(database, table);
+ statement = alterStatementCommon + statement;
+ // The checkLightSchemaChange is removed because most
scenarios support it
+ boolean result = executeStatement(database, statement);
+ if (!result) {
+ LOGGER.error("Alter table failed,statement: {}",
statement);
+ throw new SchemaChangeHandleException(String.format("Add
column failed,statement: %s", statement));
+ }
+ LOGGER.info("Alter table success,statement: {}", statement);
+ reportMetric(database, table, originData.length);
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Alter table failed, origin schema:
%s", originSchema), e);
+ }
+ handleDirtyData(data, originData, database, table,
DirtyType.HANDLE_ALTER_TABLE_ERROR, e);
+ }
+ }
+ }
+
+ private String doChangeColumnType(SchemaChangeType type, String
originSchema) {
+ LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+ return null;
+ }
+
+ private String doRenameColumn(SchemaChangeType type, String originSchema) {
+ LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+ return null;
+ }
+
+ private String doDropColumn(List<AlterColumn> alterColumns) {
+ return operationHelper.buildDropColumnStatement(alterColumns);
+ }
+
+ private String doAddColumn(List<AlterColumn> alterColumns) {
+ return operationHelper.buildAddColumnStatement(alterColumns);
+ }
+
+ private void doTruncateTable(SchemaChangeType type, String originSchema) {
+ SchemaChangePolicy policy =
policyMap.get(SchemaChangeType.TRUNCATE_TABLE);
+ if (policy == SchemaChangePolicy.ENABLE) {
+ LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+ return;
+ }
+ doSchemaChangeBase(type, policy, originSchema);
+ }
+
+ private void doRenameTable(SchemaChangeType type, String originSchema) {
+ SchemaChangePolicy policy =
policyMap.get(SchemaChangeType.RENAME_TABLE);
+ if (policy == SchemaChangePolicy.ENABLE) {
+ LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+ return;
+ }
+ doSchemaChangeBase(type, policy, originSchema);
+ }
+
+ private void doDropTable(SchemaChangeType type, String originSchema) {
+ SchemaChangePolicy policy = policyMap.get(SchemaChangeType.DROP_TABLE);
+ if (policy == SchemaChangePolicy.ENABLE) {
+ LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+ return;
+ }
+ doSchemaChangeBase(type, policy, originSchema);
+ }
+
+ private void doCreateTable(byte[] originData, String database, String
table, SchemaChangeType type,
+ String originSchema, JsonNode data, CreateTableOperation
operation) {
+ SchemaChangePolicy policy = policyMap.get(type);
+ if (policy == SchemaChangePolicy.ENABLE) {
+ try {
+ List<String> primaryKeys =
dynamicSchemaFormat.extractPrimaryKeyNames(data);
+ String stmt =
operationHelper.buildCreateTableStatement(database, table, primaryKeys,
operation);
+ boolean result = executeStatement(database, stmt);
+ if (!result) {
+ LOGGER.error("Create table failed,statement: {}", stmt);
+ throw new IOException(String.format("Create table
failed,statement: %s", stmt));
+ }
+ reportMetric(database, table, originData.length);
+ return;
+ } catch (Exception e) {
+ if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw new SchemaChangeHandleException(
+ String.format("Drop column failed, origin schema:
%s", originSchema), e);
+ }
+ handleDirtyData(data, originData, database, table,
DirtyType.CREATE_TABLE_ERROR, e);
+ return;
+ }
+ }
+ doSchemaChangeBase(type, policy, originSchema);
+ }
+
+ private void doSchemaChangeBase(SchemaChangeType type, SchemaChangePolicy
policy, String schema) {
+ if (policy == null) {
+ LOGGER.warn("Unsupported for {}: {}", type, schema);
+ return;
+ }
+ switch (policy) {
+ case LOG:
+ LOGGER.warn("Unsupported for {}: {}", type, schema);
+ break;
+ case ERROR:
+ throw new
SchemaChangeHandleException(String.format("Unsupported for %s: %s", type,
schema));
+ default:
+ }
+ }
+
+ private Map<String, Object> buildRequestParam(String column, boolean
dropColumn) {
+ Map<String, Object> params = new HashMap<>();
+ params.put("isDropColumn", dropColumn);
+ params.put("columnName", column);
+ return params;
+ }
+
+ private String authHeader() {
+ return "Basic " + new
String(Base64.encodeBase64((options.getUsername() + ":"
+ + options.getPassword()).getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private boolean executeStatement(String database, String stmt) throws
IOException {
+ Map<String, String> param = new HashMap<>();
+ param.put("stmt", stmt);
+ String requestUrl = String.format(SCHEMA_CHANGE_API,
options.getFenodes(), database);
+ HttpPost httpPost = new HttpPost(requestUrl);
+ httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+ httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON);
+ httpPost.setEntity(new
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+ return sendRequest(httpPost);
+ }
+
+ private boolean checkLightSchemaChange(String database, String table,
String column, boolean dropColumn)
+ throws IOException {
+ String url = String.format(CHECK_LIGHT_SCHEMA_CHANGE_API,
options.getFenodes(), database, table);
+ Map<String, Object> param = buildRequestParam(column, dropColumn);
+ HttpGetEntity httpGet = new HttpGetEntity(url);
+ httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+ httpGet.setEntity(new
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+ boolean success = sendRequest(httpGet);
+ if (!success) {
+ LOGGER.warn("schema change can not do table {}.{}", database,
table);
+ }
+ return success;
+ }
+
+ @SuppressWarnings("unchecked")
+ private boolean sendRequest(HttpUriRequest request) {
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ for (int i = 0; i < maxRetries; i++) {
+ try {
+ CloseableHttpResponse response =
httpclient.execute(request);
+ final int statusCode =
response.getStatusLine().getStatusCode();
+ if (statusCode == HttpStatus.SC_OK && response.getEntity()
!= null) {
+ String loadResult =
EntityUtils.toString(response.getEntity());
+ Map<String, Object> responseMap =
dynamicSchemaFormat.objectMapper
+ .readValue(loadResult, Map.class);
+ String code = responseMap.getOrDefault("code",
"-1").toString();
+ if (DORIS_HTTP_CALL_SUCCESS.equals(code)) {
+ return true;
+ }
+ LOGGER.error("send request error: {}", loadResult);
+ }
+ } catch (Exception e) {
+ if (i >= maxRetries) {
+ LOGGER.error("send http requests error", e);
+ throw new IOException(e);
+ }
+ try {
+ Thread.sleep(1000L * i);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException("unable to send http
request,interrupted while doing another attempt", e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("send request error", e);
+ throw new SchemaChangeHandleException("send request error", e);
+ }
+ return false;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 7017f18749..1ca00e92c8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -30,7 +30,9 @@ import
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.doris.model.RespContent;
+import org.apache.inlong.sort.doris.schema.SchemaChangeHelper;
import org.apache.inlong.sort.doris.util.DorisParseUtils;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -149,6 +151,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private final String dynamicSchemaFormat;
private final boolean ignoreSingleTableErrors;
private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+ private final String[] fieldNames;
+ private final LogicalType[] logicalTypes;
+ private final boolean enableSchemaChange;
+ @Nullable
+ private final String schemaChangePolicies;
private long batchBytes = 0L;
private int size;
private DorisStreamLoad dorisStreamLoad;
@@ -160,15 +167,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private transient SinkTableMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
- private final String[] fieldNames;
private volatile boolean jsonFormat;
private volatile RowData.FieldGetter[] fieldGetters;
private String fieldDelimiter;
private String lineDelimiter;
private String columns;
- private final LogicalType[] logicalTypes;
private DirtySinkHelper<Object> dirtySinkHelper;
private transient Schema schema;
+ private SchemaChangeHelper helper;
public DorisDynamicSchemaOutputFormat(DorisOptions option,
DorisReadOptions readOptions,
@@ -185,7 +191,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
String auditHostAndPorts,
boolean multipleSink,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<Object> dirtySink) {
+ @Nullable DirtySink<Object> dirtySink,
+ boolean enableSchemaChange,
+ @Nullable String schemaChangePolicies) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
@@ -201,7 +209,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
this.ignoreSingleTableErrors = ignoreSingleTableErrors;
this.schemaUpdatePolicy = schemaUpdatePolicy;
this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
-
+ this.enableSchemaChange = enableSchemaChange;
+ this.schemaChangePolicies = schemaChangePolicies;
handleStreamLoadProp();
}
@@ -211,7 +220,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
* @return builder
*/
public static DorisDynamicSchemaOutputFormat.Builder builder() {
- return new DorisDynamicSchemaOutputFormat.Builder();
+ return new Builder();
}
private void handleStreamLoadProp() {
@@ -274,9 +283,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
}
- if (multipleSink && StringUtils.isNotBlank(dynamicSchemaFormat)) {
+ if (multipleSink) {
jsonDynamicSchemaFormat =
(JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+ helper = SchemaChangeHelper.of(jsonDynamicSchemaFormat, options,
enableSchemaChange,
+ enableSchemaChange ?
SchemaChangeUtils.deserialize(schemaChangePolicies) : null, databasePattern,
+ tablePattern, executionOptions.getMaxRetries(),
schemaUpdatePolicy, metricData, dirtySinkHelper);
}
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
@@ -401,7 +413,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
if (isDDL) {
ddlNum.incrementAndGet();
- // Ignore ddl change for now
+ helper.process(rowData.getBinary(0), rootNode);
return;
}
String tableIdentifier;
@@ -925,6 +937,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private String[] fieldNames;
private DirtyOptions dirtyOptions;
private DirtySink<Object> dirtySink;
+ private boolean enableSchemaChange;
+ private String schemaChangePolicies;
public Builder() {
this.optionsBuilder =
DorisOptions.builder().setTableIdentifier("");
@@ -1021,6 +1035,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
return this;
}
+ public Builder setEnableSchemaChange(boolean enableSchemaChange) {
+ this.enableSchemaChange = enableSchemaChange;
+ return this;
+ }
+
+ public Builder setSchemaChangePolicies(String schemaChangePolicies) {
+ this.schemaChangePolicies = schemaChangePolicies;
+ return this;
+ }
+
@SuppressWarnings({"rawtypes"})
public DorisDynamicSchemaOutputFormat build() {
LogicalType[] logicalTypes = null;
@@ -1044,7 +1068,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
auditHostAndPorts,
multipleSink,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ enableSchemaChange,
+ schemaChangePolicies);
}
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index 802979ffc5..5f6e789bb6 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -22,6 +22,9 @@ import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -43,8 +46,12 @@ import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
@@ -67,6 +74,8 @@ import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_ENABLE;
+import static
org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_POLICIES;
/**
* This class copy from {@link
org.apache.doris.flink.table.DorisDynamicTableFactory}
@@ -176,6 +185,35 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
.withDescription("the flush max bytes (includes all append, upsert
and delete records), over this number"
+ " in batch, will flush data. The default value is
10MB.");
+ private static final Map<SchemaChangeType, List<SchemaChangePolicy>>
SUPPORTS_POLICY_MAP = new HashMap<>();
+
+ static {
+ SUPPORTS_POLICY_MAP.put(SchemaChangeType.CREATE_TABLE,
+ Arrays.asList(SchemaChangePolicy.ENABLE,
SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG,
+ SchemaChangePolicy.ERROR));
+ SUPPORTS_POLICY_MAP.put(SchemaChangeType.DROP_TABLE,
+ Arrays.asList(SchemaChangePolicy.IGNORE,
SchemaChangePolicy.LOG,
+ SchemaChangePolicy.ERROR));
+ SUPPORTS_POLICY_MAP.put(SchemaChangeType.RENAME_TABLE,
+ Arrays.asList(SchemaChangePolicy.IGNORE,
SchemaChangePolicy.LOG,
+ SchemaChangePolicy.ERROR));
+ SUPPORTS_POLICY_MAP.put(SchemaChangeType.TRUNCATE_TABLE,
+ Arrays.asList(SchemaChangePolicy.IGNORE,
SchemaChangePolicy.LOG,
+ SchemaChangePolicy.ERROR));
+ SUPPORTS_POLICY_MAP.put(SchemaChangeType.ADD_COLUMN,
+ Arrays.asList(SchemaChangePolicy.ENABLE,
SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG,
+ SchemaChangePolicy.ERROR));
+ SUPPORTS_POLICY_MAP.put(SchemaChangeType.DROP_COLUMN,
+ Arrays.asList(SchemaChangePolicy.ENABLE,
SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG,
+ SchemaChangePolicy.ERROR));
+ SUPPORTS_POLICY_MAP.put(SchemaChangeType.RENAME_COLUMN,
+ Arrays.asList(SchemaChangePolicy.IGNORE,
SchemaChangePolicy.LOG,
+ SchemaChangePolicy.ERROR));
+ SUPPORTS_POLICY_MAP.put(SchemaChangeType.CHANGE_COLUMN_TYPE,
+ Arrays.asList(SchemaChangePolicy.IGNORE,
SchemaChangePolicy.LOG,
+ SchemaChangePolicy.ERROR));
+ }
+
@Override
public String factoryIdentifier() {
return "doris-inlong";
@@ -223,6 +261,8 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(INLONG_AUDIT);
options.add(FactoryUtil.SINK_PARALLELISM);
options.add(AUDIT_KEYS);
+ options.add(SINK_SCHEMA_CHANGE_ENABLE);
+ options.add(SINK_SCHEMA_CHANGE_POLICIES);
return options;
}
@@ -309,8 +349,10 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
SchemaUpdateExceptionPolicy schemaUpdatePolicy = helper.getOptions()
.getOptional(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY).orElse(SchemaUpdateExceptionPolicy.THROW_WITH_STOP);
String sinkMultipleFormat =
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
- validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
- multipleSink, sinkMultipleFormat, databasePattern,
tablePattern);
+ boolean enableSchemaChange =
helper.getOptions().get(SINK_SCHEMA_CHANGE_ENABLE);
+ String schemaChangePolicies =
helper.getOptions().getOptional(SINK_SCHEMA_CHANGE_POLICIES).orElse(null);
+ validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
multipleSink, sinkMultipleFormat,
+ databasePattern, tablePattern, enableSchemaChange,
schemaChangePolicies);
String inlongMetric =
helper.getOptions().getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
String auditHostAndPorts =
helper.getOptions().getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
Integer parallelism =
helper.getOptions().getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null);
@@ -333,11 +375,13 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
auditHostAndPorts,
parallelism,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ enableSchemaChange,
+ schemaChangePolicies);
}
private void validateSinkMultiple(DataType physicalDataType, boolean
multipleSink, String sinkMultipleFormat,
- String databasePattern, String tablePattern) {
+ String databasePattern, String tablePattern, boolean
enableSchemaChange, String schemaChangePolicies) {
if (multipleSink) {
if (StringUtils.isBlank(databasePattern)) {
throw new ValidationException(
@@ -367,6 +411,21 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
"Only supports 'BYTES' or 'VARBINARY(n)' of
PhysicalDataType "
+ "when the option 'sink.multiple.enable' is
'true'");
}
+ if (enableSchemaChange) {
+ Map<SchemaChangeType, SchemaChangePolicy> policyMap =
SchemaChangeUtils
+ .deserialize(schemaChangePolicies);
+ for (Entry<SchemaChangeType, SchemaChangePolicy> kv :
policyMap.entrySet()) {
+ List<SchemaChangePolicy> policies =
SUPPORTS_POLICY_MAP.get(kv.getKey());
+ if (policies == null) {
+ throw new ValidationException(
+ String.format("Unsupported type of
schemage-change: %s", kv.getKey()));
+ }
+ if (!policies.contains(kv.getValue())) {
+ throw new ValidationException(
+ String.format("Unsupported policy of
schemage-change: %s", kv.getValue()));
+ }
+ }
+ }
}
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index cbec6fd682..be7c8b86b3 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -53,6 +53,9 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
private final Integer parallelism;
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
+ private final boolean enableSchemaChange;
+ @Nullable
+ private final String schemaChangePolicies;
public DorisDynamicTableSink(DorisOptions options,
DorisReadOptions readOptions,
@@ -68,7 +71,9 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
String auditHostAndPorts,
Integer parallelism,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<Object> dirtySink) {
+ @Nullable DirtySink<Object> dirtySink,
+ boolean enableSchemaChange,
+ @Nullable String schemaChangePolicies) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
@@ -84,6 +89,8 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
this.parallelism = parallelism;
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
+ this.enableSchemaChange = enableSchemaChange;
+ this.schemaChangePolicies = schemaChangePolicies;
}
@Override
@@ -114,7 +121,9 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
.setIgnoreSingleTableErrors(ignoreSingleTableErrors)
.setSchemaUpdatePolicy(schemaUpdatePolicy)
.setDirtyOptions(dirtyOptions)
- .setDirtySink(dirtySink);
+ .setDirtySink(dirtySink)
+ .setEnableSchemaChange(enableSchemaChange)
+ .setSchemaChangePolicies(schemaChangePolicies);
return SinkFunctionProvider.of(
new GenericDorisSinkFunction<>(builder.build()), parallelism);
}
@@ -135,7 +144,9 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
auditHostAndPorts,
parallelism,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ enableSchemaChange,
+ schemaChangePolicies);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java
new file mode 100644
index 0000000000..2c3cc85d35
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.schema;
+
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Test for {@link OperationHelper}
+ */
+public class OperationHelperTest {
+
+ private final Map<Integer, Column> allTypes2Columns =
+ ImmutableMap.<Integer, Column>builder()
+ .put(Types.CHAR, new Column("c",
Collections.singletonList("32"), Types.CHAR,
+ new Position(PositionType.FIRST, null), true,
"InLong", "a column"))
+ .put(Types.VARCHAR, new Column("c",
Collections.singletonList("32"), Types.VARCHAR,
+ new Position(PositionType.FIRST, null), false,
"InLong", "a column"))
+ .put(Types.SMALLINT, new Column("c",
Collections.singletonList("8"), Types.SMALLINT,
+ new Position(PositionType.AFTER, "b"), true,
"2023", "a column"))
+ .put(Types.INTEGER, new Column("c",
Collections.singletonList("11"), Types.INTEGER,
+ new Position(PositionType.AFTER, "b"), true,
"2023", "a column"))
+ .put(Types.BIGINT, new Column("c",
Collections.singletonList("16"), Types.BIGINT,
+ new Position(PositionType.AFTER, "b"), true,
"2023", "a column"))
+ .put(Types.REAL,
+ new Column("c", Arrays.asList("11", "2"),
Types.REAL, new Position(PositionType.AFTER, "b"),
+ true, "99.99", "a column"))
+ .put(Types.DOUBLE, new Column("c", Arrays.asList("11",
"2"), Types.DOUBLE,
+ new Position(PositionType.AFTER, "b"), true,
"99.99", "a column"))
+ .put(Types.FLOAT, new Column("c", Arrays.asList("11",
"2"), Types.FLOAT,
+ new Position(PositionType.AFTER, "b"), true,
"99.99", "a column"))
+ .put(Types.DECIMAL, new Column("c", Arrays.asList("11",
"2"), Types.DECIMAL,
+ new Position(PositionType.AFTER, "b"), true,
"99.99", "a column"))
+ .put(Types.NUMERIC, new Column("c", Arrays.asList("11",
"2"), Types.NUMERIC,
+ new Position(PositionType.AFTER, "b"), true,
"99.99", "a column"))
+ .put(Types.BIT,
+ new Column("c", null, Types.BIT, new
Position(PositionType.AFTER, "b"), true, "false",
+ "a column"))
+ .put(Types.TIME,
+ new Column("c", null, Types.TIME, new
Position(PositionType.AFTER, "b"), true, "10:30",
+ "a column"))
+ .put(Types.TIME_WITH_TIMEZONE,
+ new Column("c", null, Types.TIME_WITH_TIMEZONE,
new Position(PositionType.AFTER, "b"),
+ true, "10:30", "a column"))
+ .put(Types.TIMESTAMP_WITH_TIMEZONE,
+ new Column("c", null,
Types.TIMESTAMP_WITH_TIMEZONE, new Position(PositionType.AFTER, "b"),
+ true, "2023-01-01 10:30", "a column"))
+ .put(Types.TIMESTAMP, new Column("c", null,
Types.TIMESTAMP, new Position(PositionType.AFTER, "b"),
+ true, "2023-01-01 10:30", "a column"))
+ .put(Types.BINARY, new Column("c", null, Types.BINARY, new
Position(PositionType.AFTER, "b"),
+ true, "this is a BINARY", "a column"))
+ .put(Types.VARBINARY, new Column("c", null, Types.BINARY,
new Position(PositionType.AFTER, "b"),
+ true, "this is a VARBINARY", "a column"))
+ .put(Types.BLOB,
+ new Column("c", null, Types.BLOB, new
Position(PositionType.AFTER, "b"), true,
+ "this is a BLOB",
+ "a column"))
+ .put(Types.CLOB,
+ new Column("c", null, Types.CLOB, new
Position(PositionType.AFTER, "b"), true,
+ "this is a CLOB",
+ "a column"))
+ .put(Types.DATE,
+ new Column("c", null, Types.DATE, new
Position(PositionType.AFTER, "b"), true, "2023-01-01",
+ "a column"))
+ .put(Types.BOOLEAN,
+ new Column("c", null, Types.BOOLEAN, new
Position(PositionType.AFTER, "b"), true, "true",
+ "a column"))
+ .put(Types.LONGNVARCHAR,
+ new Column("c", null, Types.LONGNVARCHAR, new
Position(PositionType.AFTER, "b"),
+ true, "this is a LONGNVARCHAR", "a
column"))
+ .put(Types.LONGVARBINARY,
+ new Column("c", null, Types.LONGVARBINARY, new
Position(PositionType.AFTER, "b"),
+ true, "this is a LONGVARBINARY", "a
column"))
+ .put(Types.LONGVARCHAR,
+ new Column("c", null, Types.LONGVARCHAR, new
Position(PositionType.AFTER, "b"),
+ true, "this is a LONGVARCHAR", "a column"))
+ .put(Types.ARRAY,
+ new Column("c", null, Types.ARRAY, new
Position(PositionType.AFTER, "b"), true,
+ "this is a ARRAY",
+ "a column"))
+ .put(Types.NCHAR,
+ new Column("c", null, Types.NCHAR, new
Position(PositionType.AFTER, "b"), true,
+ "this is a NCHAR",
+ "a column"))
+ .put(Types.NCLOB,
+ new Column("c", null, Types.NCLOB, new
Position(PositionType.AFTER, "b"), true,
+ "this is a NCLOB",
+ "a column"))
+ .put(Types.TINYINT, new Column("c",
Collections.singletonList("1"), Types.TINYINT,
+ new Position(PositionType.FIRST, null), true, "1",
"a column"))
+ .put(Types.OTHER,
+ new Column("c", null, Types.OTHER, new
Position(PositionType.AFTER, "b"), true,
+ "this is a OTHER",
+ "a column"))
+ .build();
+ private final Map<Integer, String> addColumnStatements =
+ ImmutableMap.<Integer, String>builder()
+ .put(Types.CHAR,
+ "ADD COLUMN `c` CHAR(32) DEFAULT 'InLong' COMMENT
'a column' FIRST")
+ .put(Types.VARCHAR,
+ "ADD COLUMN `c` VARCHAR(96) NOT NULL DEFAULT
'InLong' COMMENT 'a column' FIRST")
+ .put(Types.SMALLINT,
+ "ADD COLUMN `c` SMALLINT(8) DEFAULT '2023' COMMENT
'a column' AFTER `b`")
+ .put(Types.INTEGER,
+ "ADD COLUMN `c` INT(11) DEFAULT '2023' COMMENT 'a
column' AFTER `b`")
+ .put(Types.BIGINT,
+ "ADD COLUMN `c` BIGINT(16) DEFAULT '2023' COMMENT
'a column' AFTER `b`")
+ .put(Types.REAL,
+ "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99'
COMMENT 'a column' AFTER `b`")
+ .put(Types.DOUBLE,
+ "ADD COLUMN `c` DOUBLE DEFAULT '99.99' COMMENT 'a
column' AFTER `b`")
+ .put(Types.FLOAT,
+ "ADD COLUMN `c` FLOAT DEFAULT '99.99' COMMENT 'a
column' AFTER `b`")
+ .put(Types.DECIMAL,
+ "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99'
COMMENT 'a column' AFTER `b`")
+ .put(Types.NUMERIC,
+ "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99'
COMMENT 'a column' AFTER `b`")
+ .put(Types.BIT,
+ "ADD COLUMN `c` BOOLEAN DEFAULT 'false' COMMENT
'a column' AFTER `b`")
+ .put(Types.TIME,
+ "ADD COLUMN `c` STRING DEFAULT '10:30' COMMENT 'a
column' AFTER `b`")
+ .put(Types.TIME_WITH_TIMEZONE,
+ "ADD COLUMN `c` STRING DEFAULT '10:30' COMMENT 'a
column' AFTER `b`")
+ .put(Types.TIMESTAMP_WITH_TIMEZONE,
+ "ADD COLUMN `c` DATETIME DEFAULT '2023-01-01
10:30' COMMENT 'a column' AFTER `b`")
+ .put(Types.TIMESTAMP,
+ "ADD COLUMN `c` DATETIME DEFAULT '2023-01-01
10:30' COMMENT 'a column' AFTER `b`")
+ .put(Types.BINARY,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a BINARY'
COMMENT 'a column' AFTER `b`")
+ .put(Types.VARBINARY,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a
VARBINARY' COMMENT 'a column' AFTER `b`")
+ .put(Types.BLOB,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a BLOB'
COMMENT 'a column' AFTER `b`")
+ .put(Types.CLOB,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a CLOB'
COMMENT 'a column' AFTER `b`")
+ .put(Types.DATE,
+ "ADD COLUMN `c` DATE DEFAULT '2023-01-01' COMMENT
'a column' AFTER `b`")
+ .put(Types.BOOLEAN,
+ "ADD COLUMN `c` BOOLEAN DEFAULT 'true' COMMENT 'a
column' AFTER `b`")
+ .put(Types.LONGNVARCHAR,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a
LONGNVARCHAR' COMMENT 'a column' AFTER `b`")
+ .put(Types.LONGVARBINARY,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a
LONGVARBINARY' COMMENT 'a column' AFTER `b`")
+ .put(Types.LONGVARCHAR,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a
LONGVARCHAR' COMMENT 'a column' AFTER `b`")
+ .put(Types.ARRAY,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a ARRAY'
COMMENT 'a column' AFTER `b`")
+ .put(Types.NCHAR,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a NCHAR'
COMMENT 'a column' AFTER `b`")
+ .put(Types.NCLOB,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a NCLOB'
COMMENT 'a column' AFTER `b`")
+ .put(Types.TINYINT,
+ "ADD COLUMN `c` TINYINT(1) DEFAULT '1' COMMENT 'a
column' FIRST")
+ .put(Types.OTHER,
+ "ADD COLUMN `c` STRING DEFAULT 'this is a OTHER'
COMMENT 'a column' AFTER `b`")
+ .build();
+ private OperationHelper helper;
+
+ @Before
+ public void init() {
+ helper = OperationHelper.of(
+ (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat("canal-json"));
+ }
+
+ /**
+ * Test for {@link OperationHelper#buildAddColumnStatement(List)}
+ */
+ @Test
+ public void testBuildAddColumnStatement() {
+ for (Entry<Integer, Column> kv : allTypes2Columns.entrySet()) {
+ Assert.assertEquals(addColumnStatements.get(kv.getKey()),
+
helper.buildAddColumnStatement(Collections.singletonList(new AlterColumn(
+ AlterType.ADD_COLUMN, kv.getValue(), null))));
+ }
+ }
+
+ /**
+ * Test for {@link OperationHelper#buildDropColumnStatement(List)}
+ */
+ @Test
+ public void testBuildDropColumnStatement() {
+ for (Entry<Integer, Column> kv : allTypes2Columns.entrySet()) {
+ Assert.assertEquals("DROP COLUMN `c`",
+ helper.buildDropColumnStatement(Collections.singletonList(
+ new AlterColumn(AlterType.DROP_COLUMN, null,
kv.getValue()))));
+ }
+
+ }
+
+ /**
+ * Test for {@link OperationHelper#buildCreateTableStatement(String,
String, List, CreateTableOperation)}
+ */
+ @Test
+ public void testBuildCreateTableStatement() {
+ List<String> primaryKeys = Arrays.asList("a", "b");
+ List<Column> columns = Arrays.asList(new Column("a",
Collections.singletonList("32"), Types.VARCHAR,
+ new Position(PositionType.FIRST, null), false, "InLong", "a
column"),
+ new Column("b", Collections.singletonList("32"), Types.VARCHAR,
+ new Position(PositionType.FIRST, null), false,
"InLong", "a column"),
+ new Column("c", Collections.singletonList("32"), Types.VARCHAR,
+ new Position(PositionType.FIRST, null), true,
"InLong", "a column"),
+ new Column("d", Collections.singletonList("32"), Types.VARCHAR,
+ new Position(PositionType.FIRST, null), true,
"InLong", "a column"));
+ CreateTableOperation operation = new CreateTableOperation();
+ operation.setComment("create table auto");
+ operation.setColumns(columns);
+ String database = "inlong_database";
+ String table = "inlong_table";
+ Assert.assertEquals("CREATE TABLE IF NOT EXISTS
`inlong_database`.`inlong_table`(\n"
+ + "\t`a` VARCHAR(96) NOT NULL DEFAULT 'InLong' COMMENT 'a
column',\n"
+ + "\t`b` VARCHAR(96) NOT NULL DEFAULT 'InLong' COMMENT 'a
column',\n"
+ + "\t`c` VARCHAR(96) DEFAULT 'InLong' COMMENT 'a column',\n"
+ + "\t`d` VARCHAR(96) DEFAULT 'InLong' COMMENT 'a column'\n"
+ + ")\n"
+ + "UNIQUE KEY(`a`,`b`)\n"
+ + "COMMENT 'create table auto'\n"
+ + "DISTRIBUTED BY HASH(`a`,`b`)\n"
+ + "PROPERTIES (\n"
+ + "\t\"light_schema_change\" = \"true\"\n"
+ + ")",
+ helper.buildCreateTableStatement(database, table, primaryKeys,
operation));
+ }
+}