This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c4a1f6f [feature]Support debezium table schema changes (#14)
c4a1f6f is described below
commit c4a1f6f24c67275a1ff2b97f440e4e815755dec9
Author: wudongliang <[email protected]>
AuthorDate: Wed May 8 16:47:16 2024 +0800
[feature]Support debezium table schema changes (#14)
---
pom.xml | 8 +-
.../doris/kafka/connector/cfg/DorisOptions.java | 11 +-
.../connector/cfg/DorisSinkConnectorConfig.java | 1 +
.../connector/converter/RecordDescriptor.java | 56 ++++-
.../type/debezium/AbstractDebeziumTimeType.java | 2 +-
.../kafka/connector/dialect/DialectProperties.java | 28 +++
.../doris/kafka/connector/dialect/DorisType.java | 47 ++++
.../kafka/connector/dialect/mysql/MysqlType.java | 213 +++++++++++++++++
.../connector/exception/SchemaChangeException.java | 46 ++++
.../kafka/connector/model/ColumnDescriptor.java | 91 ++++++++
.../kafka/connector/model/TableDescriptor.java | 100 ++++++++
.../doris/kafka/connector/model/doris/Field.java | 148 ++++++++++++
.../doris/kafka/connector/model/doris/Schema.java | 110 +++++++++
.../connector/service/DorisDefaultSinkService.java | 26 ++-
.../connector/service/DorisSystemService.java | 98 ++++++++
.../doris/kafka/connector/service/RestService.java | 59 +++++
.../kafka/connector/utils/ConfigCheckUtils.java | 29 +++
.../kafka/connector/utils/HttpGetWithEntity.java | 37 +++
.../kafka/connector/writer/CopyIntoWriter.java | 7 +
.../doris/kafka/connector/writer/DorisWriter.java | 7 +-
.../kafka/connector/writer/StreamLoadWriter.java | 7 +
.../writer/schema/DebeziumSchemaChange.java | 252 +++++++++++++++++++++
.../writer/schema/SchemaChangeHelper.java | 159 +++++++++++++
.../writer/schema/SchemaChangeManager.java | 142 ++++++++++++
.../connector/converter/TestRecordService.java | 4 +-
.../connector/writer/TestDebeziumSchemaChange.java | 133 +++++++++++
26 files changed, 1806 insertions(+), 15 deletions(-)
diff --git a/pom.xml b/pom.xml
index d062676..282650f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@
<metrics.version>4.2.25</metrics.version>
<spotless.version>2.4.2</spotless.version>
<debezium.version>1.9.8.Final</debezium.version>
- <mockito.version>2.27.0</mockito.version>
+ <mockito.version>4.2.0</mockito.version>
<junit.version>4.13.1</junit.version>
<slf4j.version>1.7.25</slf4j.version>
<mysql-connector.version>8.0.30</mysql-connector.version>
@@ -181,6 +181,12 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
<!--
https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 6183869..e9f1297 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -44,7 +44,7 @@ public class DorisOptions {
private final String password;
private final String database;
private final Map<String, String> topicMap;
-
+ private final String schemaTopic;
private final int fileSize;
private final int recordNum;
private long flushTime;
@@ -105,6 +105,7 @@ public class DorisOptions {
this.flushTime =
DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN;
}
this.topicMap = getTopicToTableMap(config);
+ this.schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC);
enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT;
if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) {
@@ -196,6 +197,10 @@ public class DorisOptions {
return topicMap.get(topic);
}
+ public Map<String, String> getTopicMap() {
+ return topicMap;
+ }
+
public String getQueryUrl() {
List<String> queryUrls = getQueryUrls();
return queryUrls.get(0);
@@ -288,6 +293,10 @@ public class DorisOptions {
return enableDelete;
}
+ public String getSchemaTopic() {
+ return schemaTopic;
+ }
+
/**
* parse topic to table map
*
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 58b6a62..94ea08e 100644
---
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -78,6 +78,7 @@ public class DorisSinkConnectorConfig {
public static final String DELIVERY_GUARANTEE_DEFAULT =
DeliveryGuarantee.AT_LEAST_ONCE.name();
public static final String CONVERT_MODE = "converter.mode";
public static final String CONVERT_MODE_DEFAULT =
ConverterMode.NORMAL.getName();
+ public static final String SCHEMA_TOPIC = "schema.topic";
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
index 188f103..11d097f 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import org.apache.doris.kafka.connector.dialect.mysql.MysqlType;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@@ -120,6 +121,8 @@ public class RecordDescriptor {
private final String name;
private final String schemaTypeName;
private final String schemaName;
+ private String comment;
+ private String defaultValue;
public FieldDescriptor(
Schema schema, String name, String schemaTypeName, String
schemaName) {
@@ -129,6 +132,18 @@ public class RecordDescriptor {
this.schemaName = schemaName;
}
+ public FieldDescriptor(
+ Schema schema,
+ String name,
+ String schemaTypeName,
+ String schemaName,
+ String comment,
+ String defaultValue) {
+ this(schema, name, schemaTypeName, schemaName);
+ this.comment = comment;
+ this.defaultValue = defaultValue;
+ }
+
public String getName() {
return name;
}
@@ -144,11 +159,20 @@ public class RecordDescriptor {
public String getSchemaTypeName() {
return schemaTypeName;
}
+
+ public String getComment() {
+ return comment;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
}
public static class Builder {
private SinkRecord sinkRecord;
+ private Struct tableChange;
// Internal build state
private final List<String> keyFieldNames = new ArrayList<>();
@@ -160,11 +184,20 @@ public class RecordDescriptor {
return this;
}
+ public Builder withTableChange(Struct tableChange) {
+ this.tableChange = tableChange;
+ return this;
+ }
+
public RecordDescriptor build() {
Objects.requireNonNull(sinkRecord, "The sink record must be
provided.");
final boolean flattened = !isTombstone(sinkRecord) &&
isFlattened(sinkRecord);
- readSinkRecordNonKeyData(sinkRecord, flattened);
+ if (Objects.nonNull(tableChange)) {
+ readTableChangeData(tableChange);
+ } else {
+ readSinkRecordNonKeyData(sinkRecord, flattened);
+ }
return new RecordDescriptor(
sinkRecord,
@@ -175,6 +208,27 @@ public class RecordDescriptor {
flattened);
}
+ private void readTableChangeData(final Struct tableChange) {
+ Struct tableChangeTable = tableChange.getStruct("table");
+ List<Object> tableChangeColumns =
tableChangeTable.getArray("columns");
+ for (Object column : tableChangeColumns) {
+ Struct columnStruct = (Struct) column;
+ Schema schema = columnStruct.schema();
+ String name = columnStruct.getString("name");
+ String typeName = columnStruct.getString("typeName");
+ Integer length = columnStruct.getInt32("length");
+ Integer scale = columnStruct.getInt32("scale");
+ String dorisType = MysqlType.toDorisType(typeName, length,
scale);
+ String comment = columnStruct.getString("comment");
+ String defaultValue =
columnStruct.getString("defaultValueExpression");
+ nonKeyFieldNames.add(name);
+ allFields.put(
+ name,
+ new FieldDescriptor(
+ schema, name, dorisType, schema.name(),
comment, defaultValue));
+ }
+ }
+
private boolean isFlattened(SinkRecord record) {
return record.valueSchema().name() == null
|| !record.valueSchema().name().contains("Envelope");
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
index 9cde668..c673249 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
@@ -32,7 +32,7 @@ public abstract class AbstractDebeziumTimeType extends
AbstractTimeType {
}
if (sourceValue instanceof Number) {
final LocalTime localTime = getLocalTime((Number) sourceValue);
- return String.format("'%s'",
DateTimeFormatter.ISO_TIME.format(localTime));
+ return String.format("%s",
DateTimeFormatter.ISO_TIME.format(localTime));
}
throw new ConnectException(
String.format(
diff --git
a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
b/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
new file mode 100644
index 0000000..62e4cab
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.dialect;
+
+public class DialectProperties {
+
+ /* Max precision of datetime type of Doris. */
+ public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
+
+ public static final int TIMESTAMP_TYPE_MAX_PRECISION = 9;
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
b/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
new file mode 100644
index 0000000..89b416a
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.dialect;
+
+public class DorisType {
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String BIGINT = "BIGINT";
+ public static final String LARGEINT = "LARGEINT";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String DECIMAL = "DECIMAL";
+ public static final String DECIMAL_V3 = "DECIMALV3";
+ public static final String DATE = "DATE";
+ public static final String DATE_V2 = "DATEV2";
+ public static final String DATETIME = "DATETIME";
+ public static final String DATETIME_V2 = "DATETIMEV2";
+ public static final String CHAR = "CHAR";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String STRING = "STRING";
+ public static final String HLL = "HLL";
+ public static final String BITMAP = "BITMAP";
+ public static final String ARRAY = "ARRAY";
+ public static final String JSONB = "JSONB";
+ public static final String JSON = "JSON";
+ public static final String MAP = "MAP";
+ public static final String STRUCT = "STRUCT";
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
new file mode 100644
index 0000000..663aadf
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.dialect.mysql;
+
+import static
org.apache.doris.kafka.connector.dialect.DialectProperties.MAX_SUPPORTED_DATE_TIME_PRECISION;
+import static
org.apache.doris.kafka.connector.dialect.DialectProperties.TIMESTAMP_TYPE_MAX_PRECISION;
+
+import com.google.common.base.Preconditions;
+import org.apache.doris.kafka.connector.dialect.DorisType;
+
+public class MysqlType {
+
+ // MySQL driver returns width of timestamp types instead of precision.
+ // 19 characters are used for zero-precision timestamps while others
+ // require 19 + precision + 1 characters with the additional character
+ // required for the decimal separator.
+ private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
+ private static final String BIT = "BIT";
+ private static final String BOOLEAN = "BOOLEAN";
+ private static final String BOOL = "BOOL";
+ private static final String TINYINT = "TINYINT";
+ private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED
ZEROFILL";
+ private static final String SMALLINT = "SMALLINT";
+ private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT
UNSIGNED ZEROFILL";
+ private static final String MEDIUMINT = "MEDIUMINT";
+ private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT
UNSIGNED ZEROFILL";
+ private static final String INT = "INT";
+ private static final String INT_UNSIGNED = "INT UNSIGNED";
+ private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED
ZEROFILL";
+ private static final String BIGINT = "BIGINT";
+ private static final String SERIAL = "SERIAL";
+ private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED
ZEROFILL";
+ private static final String REAL = "REAL";
+ private static final String REAL_UNSIGNED = "REAL UNSIGNED";
+ private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED
ZEROFILL";
+ private static final String FLOAT = "FLOAT";
+ private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED
ZEROFILL";
+ private static final String DOUBLE = "DOUBLE";
+ private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+ private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED
ZEROFILL";
+ private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
+ private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION
UNSIGNED";
+ private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
+ "DOUBLE PRECISION UNSIGNED ZEROFILL";
+ private static final String NUMERIC = "NUMERIC";
+ private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
+ private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED
ZEROFILL";
+ private static final String FIXED = "FIXED";
+ private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
+ private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED
ZEROFILL";
+ private static final String DECIMAL = "DECIMAL";
+ private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED
ZEROFILL";
+ private static final String CHAR = "CHAR";
+ private static final String VARCHAR = "VARCHAR";
+ private static final String TINYTEXT = "TINYTEXT";
+ private static final String MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String TEXT = "TEXT";
+ private static final String LONGTEXT = "LONGTEXT";
+ private static final String DATE = "DATE";
+ private static final String TIME = "TIME";
+ private static final String DATETIME = "DATETIME";
+ private static final String TIMESTAMP = "TIMESTAMP";
+ private static final String YEAR = "YEAR";
+ private static final String BINARY = "BINARY";
+ private static final String VARBINARY = "VARBINARY";
+ private static final String TINYBLOB = "TINYBLOB";
+ private static final String MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String BLOB = "BLOB";
+ private static final String LONGBLOB = "LONGBLOB";
+ private static final String JSON = "JSON";
+ private static final String ENUM = "ENUM";
+ private static final String SET = "SET";
+
+ public static String toDorisType(String type, Integer length, Integer
scale) {
+ switch (type.toUpperCase()) {
+ case BIT:
+ case BOOLEAN:
+ case BOOL:
+ return DorisType.BOOLEAN;
+ case TINYINT:
+ return DorisType.TINYINT;
+ case TINYINT_UNSIGNED:
+ case TINYINT_UNSIGNED_ZEROFILL:
+ case SMALLINT:
+ return DorisType.SMALLINT;
+ case SMALLINT_UNSIGNED:
+ case SMALLINT_UNSIGNED_ZEROFILL:
+ case INT:
+ case MEDIUMINT:
+ case YEAR:
+ return DorisType.INT;
+ case INT_UNSIGNED:
+ case INT_UNSIGNED_ZEROFILL:
+ case MEDIUMINT_UNSIGNED:
+ case MEDIUMINT_UNSIGNED_ZEROFILL:
+ case BIGINT:
+ return DorisType.BIGINT;
+ case BIGINT_UNSIGNED:
+ case BIGINT_UNSIGNED_ZEROFILL:
+ return DorisType.LARGEINT;
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ case FLOAT_UNSIGNED_ZEROFILL:
+ return DorisType.FLOAT;
+ case REAL:
+ case REAL_UNSIGNED:
+ case REAL_UNSIGNED_ZEROFILL:
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ case DOUBLE_UNSIGNED_ZEROFILL:
+ case DOUBLE_PRECISION:
+ case DOUBLE_PRECISION_UNSIGNED:
+ case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
+ return DorisType.DOUBLE;
+ case NUMERIC:
+ case NUMERIC_UNSIGNED:
+ case NUMERIC_UNSIGNED_ZEROFILL:
+ case FIXED:
+ case FIXED_UNSIGNED:
+ case FIXED_UNSIGNED_ZEROFILL:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ case DECIMAL_UNSIGNED_ZEROFILL:
+ return length != null && length <= 38
+ ? String.format(
+ "%s(%s,%s)",
+ DorisType.DECIMAL_V3,
+ length,
+ scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case DATE:
+ return DorisType.DATE_V2;
+ case DATETIME:
+ case TIMESTAMP:
+ // default precision is 0
+ // see
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+ if (length == null
+ || length <= 0
+ || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
+ return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
+ } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
+ // Timestamp with a fraction of seconds.
+ // For example, 2024-01-01 01:01:01.1
+ // The decimal point will occupy 1 character.
+ // Thus,the length of the timestamp is 21.
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME_V2,
+ Math.min(
+ length -
ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
+ MAX_SUPPORTED_DATE_TIME_PRECISION));
+ } else if (length <= TIMESTAMP_TYPE_MAX_PRECISION) {
+ // For Debezium JSON data, the timestamp/datetime length
ranges from 0 to 9.
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME_V2,
+ Math.min(length,
MAX_SUPPORTED_DATE_TIME_PRECISION));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported length: "
+ + length
+ + " for MySQL TIMESTAMP/DATETIME types");
+ }
+ case CHAR:
+ case VARCHAR:
+ Preconditions.checkNotNull(length);
+ return length * 3 > 65533
+ ? DorisType.STRING
+ : String.format("%s(%s)", DorisType.VARCHAR, length *
3);
+ case TINYTEXT:
+ case TEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case ENUM:
+ case TIME:
+ case TINYBLOB:
+ case BLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ case BINARY:
+ case VARBINARY:
+ case SET:
+ return DorisType.STRING;
+ case JSON:
+ return DorisType.JSONB;
+ default:
+ throw new UnsupportedOperationException("Unsupported MySQL
Type: " + type);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java
b/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java
new file mode 100644
index 0000000..4c0ce46
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.exception;
+
+/** Doris Schema Change run exception. */
+public class SchemaChangeException extends RuntimeException {
+ public SchemaChangeException() {
+ super();
+ }
+
+ public SchemaChangeException(String message) {
+ super(message);
+ }
+
+ public SchemaChangeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SchemaChangeException(Throwable cause) {
+ super(cause);
+ }
+
+ protected SchemaChangeException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java
b/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java
new file mode 100644
index 0000000..f34c9fa
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model;
+
+import java.util.Objects;
+
+public class ColumnDescriptor {
+ private final String columnName;
+ private final String typeName;
+ private final String comment;
+ private final String defaultValue;
+
+ private ColumnDescriptor(
+ String columnName, String typeName, String comment, String
defaultValue) {
+ this.columnName = columnName;
+ this.typeName = typeName;
+ this.comment = comment;
+ this.defaultValue = defaultValue;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String columnName;
+ private String typeName;
+ private String comment;
+ private String defaultValue;
+
+ public Builder columnName(String columnName) {
+ this.columnName = columnName;
+ return this;
+ }
+
+ public Builder typeName(String typeName) {
+ this.typeName = typeName;
+ return this;
+ }
+
+ public Builder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public Builder defaultValue(String defaultValue) {
+ this.defaultValue = defaultValue;
+ return this;
+ }
+
+ public ColumnDescriptor build() {
+ Objects.requireNonNull(columnName, "A column name is required");
+ Objects.requireNonNull(typeName, "A type name is required");
+
+ return new ColumnDescriptor(columnName, typeName, comment,
defaultValue);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java
b/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java
new file mode 100644
index 0000000..4cf14f4
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TableDescriptor {
+ private final String tableName;
+ private final String tableType;
+ private final Map<String, ColumnDescriptor> columns = new
LinkedHashMap<>();
+
+ private TableDescriptor(String tableName, String tableType,
List<ColumnDescriptor> columns) {
+ this.tableName = tableName;
+ this.tableType = tableType;
+ columns.forEach(c -> this.columns.put(c.getColumnName(), c));
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getTableType() {
+ return tableType;
+ }
+
+ public Collection<ColumnDescriptor> getColumns() {
+ return columns.values();
+ }
+
+ public ColumnDescriptor getColumnByName(String columnName) {
+ return columns.get(columnName);
+ }
+
+ public boolean hasColumn(String columnName) {
+ return columns.containsKey(columnName);
+ }
+
+ public static class Builder {
+ private String schemaName;
+ private String tableName;
+ private String tableType;
+ private final List<ColumnDescriptor> columns = new ArrayList<>();
+
+ private Builder() {}
+
+ public Builder schemaName(String schemaName) {
+ this.schemaName = schemaName;
+ return this;
+ }
+
+ public Builder tableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public Builder type(String tableType) {
+ this.tableType = tableType;
+ return this;
+ }
+
+ public Builder column(ColumnDescriptor column) {
+ this.columns.add(column);
+ return this;
+ }
+
+ public Builder columns(List<ColumnDescriptor> columns) {
+ this.columns.addAll(columns);
+ return this;
+ }
+
+ public TableDescriptor build() {
+ return new TableDescriptor(tableName, tableType, columns);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java
b/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java
new file mode 100644
index 0000000..881a4a9
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model.doris;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Objects;
+
+public class Field {
+ @JsonProperty(value = "name")
+ private String name;
+
+ @JsonProperty(value = "type")
+ private String type;
+
+ @JsonProperty(value = "comment")
+ private String comment;
+
+ @JsonProperty(value = "precision")
+ private int precision;
+
+ @JsonProperty(value = "scale")
+ private int scale;
+
+ @JsonProperty(value = "aggregation_type")
+ private String aggregationType;
+
+ public Field() {}
+
+ public Field(
+ String name,
+ String type,
+ String comment,
+ int precision,
+ int scale,
+ String aggregationType) {
+ this.name = name;
+ this.type = type;
+ this.comment = comment;
+ this.precision = precision;
+ this.scale = scale;
+ this.aggregationType = aggregationType;
+ }
+
+ public String getAggregationType() {
+ return aggregationType;
+ }
+
+ public void setAggregationType(String aggregationType) {
+ this.aggregationType = aggregationType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public void setComment(String comment) {
+ this.comment = comment;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public void setPrecision(int precision) {
+ this.precision = precision;
+ }
+
+ public int getScale() {
+ return scale;
+ }
+
+ public void setScale(int scale) {
+ this.scale = scale;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Field field = (Field) o;
+ return precision == field.precision
+ && scale == field.scale
+ && Objects.equals(name, field.name)
+ && Objects.equals(type, field.type)
+ && Objects.equals(comment, field.comment);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, comment, precision, scale);
+ }
+
+ @Override
+ public String toString() {
+ return "Field{"
+ + "name='"
+ + name
+ + '\''
+ + ", type='"
+ + type
+ + '\''
+ + ", comment='"
+ + comment
+ + '\''
+ + ", precision="
+ + precision
+ + ", scale="
+ + scale
+ + '}';
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java
b/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java
new file mode 100644
index 0000000..be29b3e
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model.doris;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class Schema {
+ private int status = 0;
+ private String keysType;
+ private List<Field> properties;
+
+ public Schema() {
+ properties = new ArrayList<>();
+ }
+
+ public Schema(int fieldCount) {
+ properties = new ArrayList<>(fieldCount);
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public String getKeysType() {
+ return keysType;
+ }
+
+ public void setKeysType(String keysType) {
+ this.keysType = keysType;
+ }
+
+ public List<Field> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(List<Field> properties) {
+ this.properties = properties;
+ }
+
+ public void put(
+ String name,
+ String type,
+ String comment,
+ int scale,
+ int precision,
+ String aggregationType) {
+ properties.add(new Field(name, type, comment, scale, precision,
aggregationType));
+ }
+
+ public void put(Field f) {
+ properties.add(f);
+ }
+
+ public Field get(int index) {
+ if (index >= properties.size()) {
+ throw new IndexOutOfBoundsException(
+ "Index: " + index + ", Fields sizeļ¼" + properties.size());
+ }
+ return properties.get(index);
+ }
+
+ public int size() {
+ return properties.size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Schema schema = (Schema) o;
+ return status == schema.status && Objects.equals(properties,
schema.properties);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, properties);
+ }
+
+ @Override
+ public String toString() {
+ return "Schema{" + "status=" + status + ", properties=" + properties +
'}';
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index f25e4b9..29810e4 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -23,6 +23,7 @@ import com.codahale.metrics.MetricRegistry;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import org.apache.doris.kafka.connector.DorisSinkTask;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.connection.ConnectionProvider;
@@ -34,6 +35,7 @@ import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
import org.apache.doris.kafka.connector.writer.DorisWriter;
import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
import org.apache.doris.kafka.connector.writer.load.LoadModel;
+import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -86,15 +88,23 @@ public class DorisDefaultSinkService implements
DorisSinkService {
if (writer.containsKey(nameIndex)) {
LOG.info("already start task");
} else {
- LoadModel loadModel = dorisOptions.getLoadModel();
+ DorisWriter dorisWriter;
String topic = topicPartition.topic();
int partition = topicPartition.partition();
- DorisWriter dorisWriter =
- LoadModel.COPY_INTO.equals(loadModel)
- ? new CopyIntoWriter(
- topic, partition, dorisOptions, conn,
connectMonitor)
- : new StreamLoadWriter(
- topic, partition, dorisOptions, conn,
connectMonitor);
+ String schemaChangeTopic = dorisOptions.getSchemaTopic();
+ if (Objects.nonNull(schemaChangeTopic) &&
schemaChangeTopic.equals(topic)) {
+ dorisWriter =
+ new DebeziumSchemaChange(
+ topic, partition, dorisOptions, conn,
connectMonitor);
+ } else {
+ LoadModel loadModel = dorisOptions.getLoadModel();
+ dorisWriter =
+ LoadModel.COPY_INTO.equals(loadModel)
+ ? new CopyIntoWriter(
+ topic, partition, dorisOptions, conn,
connectMonitor)
+ : new StreamLoadWriter(
+ topic, partition, dorisOptions, conn,
connectMonitor);
+ }
writer.put(nameIndex, dorisWriter);
metricsJmxReporter.start();
}
@@ -119,7 +129,7 @@ public class DorisDefaultSinkService implements
DorisSinkService {
// check all sink writer to see if they need to be flushed
for (DorisWriter writer : writer.values()) {
// Time based flushing
- if (writer.shouldFlush()) {
+ if (!(writer instanceof DebeziumSchemaChange) &&
writer.shouldFlush()) {
writer.flushBuffer();
}
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
new file mode 100644
index 0000000..2365aa4
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.service;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DorisSystemService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisSystemService.class);
+ private final JdbcConnectionProvider jdbcConnectionProvider;
+
+ public DorisSystemService(DorisOptions dorisOptions) {
+ this.jdbcConnectionProvider = new JdbcConnectionProvider(dorisOptions);
+ }
+
+ private static final List<String> builtinDatabases =
+ Collections.singletonList("information_schema");
+
+ public boolean tableExists(String database, String table) {
+ return databaseExists(database) &&
listTables(database).contains(table);
+ }
+
+ public boolean databaseExists(String database) {
+ return listDatabases().contains(database);
+ }
+
+ public List<String> listDatabases() {
+ return extractColumnValuesBySQL(
+ "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
+ 1,
+ dbName -> !builtinDatabases.contains(dbName));
+ }
+
+ public List<String> listTables(String databaseName) {
+ if (!databaseExists(databaseName)) {
+ throw new DorisException("database" + databaseName + " is not
exists");
+ }
+ return extractColumnValuesBySQL(
+ "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE
TABLE_SCHEMA = ?",
+ 1,
+ null,
+ databaseName);
+ }
+
+ public List<String> extractColumnValuesBySQL(
+ String sql, int columnIndex, Predicate<String> filterFunc,
Object... params) {
+
+ List<String> columnValues = Lists.newArrayList();
+ try (PreparedStatement ps =
+
jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
+ if (Objects.nonNull(params) && params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ ps.setObject(i + 1, params[i]);
+ }
+ }
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ String columnValue = rs.getString(columnIndex);
+ if (Objects.isNull(filterFunc) ||
filterFunc.test(columnValue)) {
+ columnValues.add(columnValue);
+ }
+ }
+ return columnValues;
+ } catch (Exception e) {
+ LOG.error("The following SQL query could not be executed: {}",
sql, e);
+ throw new DorisException(
+ String.format("The following SQL query could not be
executed: %s", sql), e);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
index e6b5c33..e7e4a4f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
+++ b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
@@ -20,6 +20,7 @@
package org.apache.doris.kafka.connector.service;
import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
@@ -39,19 +40,29 @@ import org.apache.commons.io.IOUtils;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.exception.ConnectedFailedException;
import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
import org.apache.doris.kafka.connector.model.BackendV2;
import org.apache.doris.kafka.connector.model.LoadOperation;
+import org.apache.doris.kafka.connector.model.doris.Schema;
import org.apache.doris.kafka.connector.utils.BackoffAndRetryUtils;
+import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
public class RestService {
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
+ private static final String TABLE_SCHEMA_API =
"http://%s/api/%s/%s/_schema";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* get Doris BE nodes to request.
@@ -285,4 +296,52 @@ public class RestService {
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
+
+ /** Get table schema from doris. */
+ public static Schema getSchema(
+ DorisOptions dorisOptions, String db, String table, Logger logger)
{
+ logger.trace("start get " + db + "." + table + " schema from doris.");
+ Object responseData = null;
+ try {
+ String tableSchemaUri =
+ String.format(TABLE_SCHEMA_API, dorisOptions.getHttpUrl(),
db, table);
+ HttpGet httpGet = new HttpGet(tableSchemaUri);
+ httpGet.setHeader(HttpHeaders.AUTHORIZATION,
authHeader(dorisOptions));
+ Map<String, Object> responseMap = handleResponse(httpGet, logger);
+ responseData = responseMap.get("data");
+ String schemaStr = OBJECT_MAPPER.writeValueAsString(responseData);
+ return OBJECT_MAPPER.readValue(schemaStr, Schema.class);
+ } catch (JsonProcessingException | IllegalArgumentException e) {
+ throw new SchemaChangeException("can not parse response schema " +
responseData, e);
+ }
+ }
+
+ private static String authHeader(DorisOptions dorisOptions) {
+ return "Basic "
+ + new String(
+ org.apache.commons.codec.binary.Base64.encodeBase64(
+ (dorisOptions.getUser() + ":" +
dorisOptions.getPassword())
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static Map handleResponse(HttpUriRequest request, Logger logger) {
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ CloseableHttpResponse response = httpclient.execute(request);
+ final int statusCode = response.getStatusLine().getStatusCode();
+ final String reasonPhrase =
response.getStatusLine().getReasonPhrase();
+ if (statusCode == 200 && response.getEntity() != null) {
+ String responseEntity =
EntityUtils.toString(response.getEntity());
+ return OBJECT_MAPPER.readValue(responseEntity, Map.class);
+ } else {
+ throw new SchemaChangeException(
+ "Failed to schemaChange, status: "
+ + statusCode
+ + ", reason: "
+ + reasonPhrase);
+ }
+ } catch (Exception e) {
+ logger.trace("SchemaChange request error,", e);
+ throw new SchemaChangeException("SchemaChange request error with "
+ e.getMessage());
+ }
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 2356f7d..84d3f90 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -19,9 +19,13 @@
package org.apache.doris.kafka.connector.utils;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.exception.ArgumentsException;
import org.apache.doris.kafka.connector.exception.DorisException;
@@ -73,6 +77,31 @@ public class ConfigCheckUtils {
configIsValid = false;
}
+ String schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC);
+ if (StringUtils.isNotEmpty(schemaTopic)) {
+ schemaTopic = schemaTopic.trim();
+ if (!topics.isEmpty()) {
+ List<String> topicList =
+
Arrays.stream(topics.split(",")).collect(Collectors.toList());
+ if (!topicList.contains(schemaTopic)) {
+ LOG.error(
+ "schema.topic is not included in topics list,
please add! "
+ + " schema.topic={}, topics={}",
+ schemaTopic,
+ topics);
+ configIsValid = false;
+ }
+ }
+ if (!topicsRegex.isEmpty() && !topicsRegex.equals(schemaTopic)) {
+ LOG.error(
+ "topics.regex must equals schema.topic. please check
again!"
+ + " topics.regex={}, schema.topic={}",
+ topicsRegex,
+ schemaTopic);
+ configIsValid = false;
+ }
+ }
+
if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
&&
parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
== null) {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
b/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
new file mode 100644
index 0000000..b36a1d5
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.utils;
+
+import java.net.URI;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+
+public class HttpGetWithEntity extends HttpEntityEnclosingRequestBase {
+ private static final String METHOD_NAME = "GET";
+
+ @Override
+ public String getMethod() {
+ return METHOD_NAME;
+ }
+
+ public HttpGetWithEntity(final String uri) {
+ super();
+ setURI(URI.create(uri));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
index a81520c..5018b27 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
@@ -33,6 +33,7 @@ import
org.apache.doris.kafka.connector.exception.CopyLoadException;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.utils.FileNameUtils;
import org.apache.doris.kafka.connector.writer.load.CopyLoad;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,6 +104,12 @@ public class CopyIntoWriter extends DorisWriter {
return loadFileList;
}
+ @Override
+ public void insert(SinkRecord record) {
+ initRecord(record);
+ insertRecord(record);
+ }
+
@Override
public long getOffset() {
if (fileNames.isEmpty()) {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 29cf1ec..0ace5a6 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -58,6 +58,7 @@ public abstract class DorisWriter {
protected RecordService recordService;
protected int taskId;
protected final DorisConnectMonitor connectMonitor;
+ protected boolean schemaChange;
public DorisWriter(
String topic,
@@ -100,7 +101,9 @@ public abstract class DorisWriter {
/** read offset from doris */
public abstract void fetchOffset();
- public void insert(final SinkRecord record) {
+ public void insert(final SinkRecord record) {}
+
+ protected void initRecord(final SinkRecord record) {
// init offset
if (!hasInitialized
&&
DeliveryGuarantee.EXACTLY_ONCE.equals(dorisOptions.getDeliveryGuarantee())) {
@@ -113,7 +116,9 @@ public abstract class DorisWriter {
fetchOffset();
this.hasInitialized = true;
}
+ }
+ protected void insertRecord(final SinkRecord record) {
// discard the record if the record offset is smaller or equal to
server side offset
if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
&& record.kafkaOffset() > processedOffset.get()) {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index 0d76b32..a8fc00a 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -39,6 +39,7 @@ import org.apache.doris.kafka.connector.utils.FileNameUtils;
import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
import org.apache.doris.kafka.connector.writer.commit.DorisCommitter;
import org.apache.doris.kafka.connector.writer.load.DorisStreamLoad;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,6 +140,12 @@ public class StreamLoadWriter extends DorisWriter {
return label2Status;
}
+ @Override
+ public void insert(SinkRecord record) {
+ initRecord(record);
+ insertRecord(record);
+ }
+
@Override
public long getOffset() {
if (committableList.isEmpty()) {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
new file mode 100644
index 0000000..fc0823b
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.schema;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.debezium.data.Envelope;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.connection.ConnectionProvider;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
+import org.apache.doris.kafka.connector.model.ColumnDescriptor;
+import org.apache.doris.kafka.connector.model.TableDescriptor;
+import org.apache.doris.kafka.connector.model.doris.Schema;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.apache.doris.kafka.connector.writer.DorisWriter;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebeziumSchemaChange extends DorisWriter {
+ private static final Logger LOG =
LoggerFactory.getLogger(DebeziumSchemaChange.class);
+ private final Map<String, String> topic2TableMap;
+ private SchemaChangeManager schemaChangeManager;
+ private DorisSystemService dorisSystemService;
+ private Set<String> sinkTableSet;
+ private List<String> ddlSqlList;
+
+ public DebeziumSchemaChange(
+ String topic,
+ int partition,
+ DorisOptions dorisOptions,
+ ConnectionProvider connectionProvider,
+ DorisConnectMonitor connectMonitor) {
+ super(topic, partition, dorisOptions, connectionProvider,
connectMonitor);
+ this.schemaChange = true;
+ this.sinkTableSet = new HashSet<>();
+ this.dorisSystemService = new DorisSystemService(dorisOptions);
+ this.topic2TableMap = dorisOptions.getTopicMap();
+ this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+ init();
+ }
+
+ @Override
+ public void fetchOffset() {
+ // do nothing
+ }
+
+ private void init() {
+ Set<Map.Entry<String, String>> entrySet = topic2TableMap.entrySet();
+ for (Map.Entry<String, String> entry : entrySet) {
+ sinkTableSet.add(entry.getValue());
+ }
+ }
+
+ @Override
+ public void insert(SinkRecord record) {
+ schemaChange(record);
+ }
+
+ @Override
+ public void commit(int partition) {
+ // do nothing
+ }
+
+ private void schemaChange(final SinkRecord record) {
+ String tableName = resolveTableName(record);
+ if (tableName == null) {
+ LOG.warn(
+ "Ignored to write record from topic '{}' partition '{}'
offset '{}'. No resolvable table name",
+ record.topic(),
+ record.kafkaPartition(),
+ record.kafkaOffset());
+ processedOffset.set(record.kafkaOffset());
+ return;
+ }
+ Struct recordStruct = (Struct) (record.value());
+ List<Object> tableChanges = recordStruct.getArray("tableChanges");
+ Struct tableChange = (Struct) tableChanges.get(0);
+ if ("DROP".equalsIgnoreCase(tableChange.getString("type"))
+ || "CREATE".equalsIgnoreCase(tableChange.getString("type"))) {
+ LOG.warn(
+ "CREATE and DROP {} tables are currently not supported.
Please create or drop them manually.",
+ tableName);
+ processedOffset.set(record.kafkaOffset());
+ return;
+ }
+ RecordDescriptor recordDescriptor =
+ RecordDescriptor.builder()
+ .withSinkRecord(record)
+ .withTableChange(tableChange)
+ .build();
+ tableChange(tableName, recordDescriptor);
+ }
+
+ private String resolveTableName(SinkRecord record) {
+ if (isTombstone(record)) {
+ LOG.warn(
+ "Ignore this record because it seems to be a tombstone
that doesn't have source field, then cannot resolve table name in topic '{}',
partition '{}', offset '{}'",
+ record.topic(),
+ record.kafkaPartition(),
+ record.kafkaOffset());
+ return null;
+ }
+ Struct source = ((Struct)
record.value()).getStruct(Envelope.FieldName.SOURCE);
+ return source.getString("table");
+ }
+
+ private void alterTableIfNeeded(String tableName, RecordDescriptor record)
{
+ LOG.debug("Attempting to alter table '{}'.", tableName);
+ if (!hasTable(tableName)) {
+ LOG.error("Table '{}' does not exist and cannot be altered.",
tableName);
+ throw new SchemaChangeException("Could not find table: " +
tableName);
+ }
+ final TableDescriptor dorisTableDescriptor =
obtainTableSchema(tableName);
+ SchemaChangeHelper.compareSchema(dorisTableDescriptor,
record.getFields());
+ ddlSqlList =
SchemaChangeHelper.generateDDLSql(dorisOptions.getDatabase(), tableName);
+ doSchemaChange(dorisOptions.getDatabase(), tableName);
+ }
+
+ /** Obtain table schema from doris. */
+ private TableDescriptor obtainTableSchema(String tableName) {
+ Schema schema = RestService.getSchema(dorisOptions, dbName, tableName,
LOG);
+ List<ColumnDescriptor> columnDescriptors = new ArrayList<>();
+ schema.getProperties()
+ .forEach(
+ column -> {
+ ColumnDescriptor columnDescriptor =
+ ColumnDescriptor.builder()
+ .columnName(column.getName())
+ .typeName(column.getType())
+ .comment(column.getComment())
+ .build();
+ columnDescriptors.add(columnDescriptor);
+ });
+ return TableDescriptor.builder()
+ .tableName(tableName)
+ .type(schema.getKeysType())
+ .columns(columnDescriptors)
+ .build();
+ }
+
+ private boolean hasTable(String tableName) {
+ return dorisSystemService.tableExists(dbName, tableName);
+ }
+
+ private void tableChange(String tableName, RecordDescriptor
recordDescriptor) {
+ if (!sinkTableSet.contains(tableName)) {
+ processedOffset.set(recordDescriptor.getOffset());
+ LOG.warn(
+ "The "
+ + tableName
+ + " is not defined and requires synchronized data.
If you need to synchronize the table data, please configure it in
'doris.topic2table.map'");
+ return;
+ }
+
+ if (!hasTable(tableName)) {
+ // TODO Table does not exist, automatically created it.
+ LOG.error("{} Table does not exist, please create manually.",
tableName);
+ } else {
+ // Table exists, lets attempt to alter it if necessary.
+ alterTableIfNeeded(tableName, recordDescriptor);
+ }
+ processedOffset.set(recordDescriptor.getOffset());
+ }
+
+ private boolean doSchemaChange(String database, String tableName) {
+ boolean status = false;
+ if (ddlSqlList.isEmpty()) {
+ LOG.info("Schema change ddl is empty, not need do schema change.");
+ return false;
+ }
+ try {
+ List<SchemaChangeHelper.DDLSchema> ddlSchemas =
SchemaChangeHelper.getDdlSchemas();
+ for (int i = 0; i < ddlSqlList.size(); i++) {
+ SchemaChangeHelper.DDLSchema ddlSchema = ddlSchemas.get(i);
+ String ddlSql = ddlSqlList.get(i);
+ boolean doSchemaChange = checkSchemaChange(database,
tableName, ddlSchema);
+ status =
+ doSchemaChange
+ && schemaChangeManager.execute(ddlSql,
dorisOptions.getDatabase());
+ LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
+ }
+ } catch (Exception e) {
+ LOG.warn("schema change error :", e);
+ }
+ return status;
+ }
+
+ private boolean checkSchemaChange(
+ String database, String table, SchemaChangeHelper.DDLSchema
ddlSchema)
+ throws IllegalArgumentException, IOException {
+ Map<String, Object> param =
+ SchemaChangeManager.buildRequestParam(
+ ddlSchema.isDropColumn(), ddlSchema.getColumnName());
+ return schemaChangeManager.checkSchemaChange(database, table, param);
+ }
+
+ public long getOffset() {
+ committedOffset.set(processedOffset.get());
+ return committedOffset.get();
+ }
+
+ private boolean isTombstone(SinkRecord record) {
+ return record.value() == null;
+ }
+
+ @VisibleForTesting
+ public void setSinkTableSet(Set<String> sinkTableSet) {
+ this.sinkTableSet = sinkTableSet;
+ }
+
+ @VisibleForTesting
+ public void setDorisSystemService(DorisSystemService dorisSystemService) {
+ this.dorisSystemService = dorisSystemService;
+ }
+
+ @VisibleForTesting
+ public List<String> getDdlSqlList() {
+ return ddlSqlList;
+ }
+
+ @VisibleForTesting
+ public void setSchemaChangeManager(SchemaChangeManager
schemaChangeManager) {
+ this.schemaChangeManager = schemaChangeManager;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
new file mode 100644
index 0000000..abf424c
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.schema;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import org.apache.doris.kafka.connector.model.ColumnDescriptor;
+import org.apache.doris.kafka.connector.model.TableDescriptor;
+
+public class SchemaChangeHelper {
+ private static final List<ColumnDescriptor> addColumnDescriptors =
Lists.newArrayList();
+ // Used to determine whether the column in the doris table can undergo
schema change
+ private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
+ private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+
+ // TODO support drop column
+ // Dropping a column is a dangerous behavior and may result in an
accidental deletion.
+ // There are some problems in the current implementation: each alter
column operation will read
+ // the table structure
+ // in doris and compare the schema with the topic message.
+ // When there are more columns in the doris table than in the upstream
table,
+ // these redundant columns in doris will be dropped, regardless of these
redundant columns, is
+ // what you need.
+ // Therefore, the operation of dropping a column behavior currently
requires the user to do it
+ // himself.
+ private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+ /**
+ * Compare kafka upstream table structure with doris table structure. If
kafka field does not
+ * contain the structure of dorisTable, then need to add this field.
+ *
+ * @param dorisTable read from the table schema of doris.
+ * @param fields table structure from kafka upstream data source.
+ */
+ public static void compareSchema(
+ TableDescriptor dorisTable, Map<String,
RecordDescriptor.FieldDescriptor> fields) {
+ // Determine whether fields need to be added to doris table
+ addColumnDescriptors.clear();
+ Collection<ColumnDescriptor> dorisTableColumns =
dorisTable.getColumns();
+ Set<String> dorisTableColumnNames =
+ dorisTableColumns.stream()
+ .map(ColumnDescriptor::getColumnName)
+ .collect(Collectors.toSet());
+ Set<Map.Entry<String, RecordDescriptor.FieldDescriptor>> fieldsEntries
= fields.entrySet();
+ for (Map.Entry<String, RecordDescriptor.FieldDescriptor> fieldEntry :
fieldsEntries) {
+ String fieldName = fieldEntry.getKey();
+ if (!dorisTableColumnNames.contains(fieldName)) {
+ RecordDescriptor.FieldDescriptor fieldDescriptor =
fieldEntry.getValue();
+ ColumnDescriptor columnDescriptor =
+ new ColumnDescriptor.Builder()
+ .columnName(fieldDescriptor.getName())
+ .typeName(fieldDescriptor.getSchemaTypeName())
+
.defaultValue(fieldDescriptor.getDefaultValue())
+ .comment(fieldDescriptor.getComment())
+ .build();
+ addColumnDescriptors.add(columnDescriptor);
+ }
+ }
+ }
+
+ public static List<String> generateDDLSql(String database, String table) {
+ ddlSchemas.clear();
+ List<String> ddlList = Lists.newArrayList();
+ for (ColumnDescriptor columnDescriptor : addColumnDescriptors) {
+ ddlList.add(buildAddColumnDDL(database, table, columnDescriptor));
+ ddlSchemas.add(new DDLSchema(columnDescriptor.getColumnName(),
false));
+ }
+ return ddlList;
+ }
+
+ public static List<DDLSchema> getDdlSchemas() {
+ return ddlSchemas;
+ }
+
+ private static String buildDropColumnDDL(String database, String
tableName, String columName) {
+ return String.format(
+ DROP_DDL,
+ identifier(database) + "." + identifier(tableName),
+ identifier(columName));
+ }
+
+ private static String buildAddColumnDDL(
+ String database, String tableName, ColumnDescriptor
columnDescriptor) {
+ String columnName = columnDescriptor.getColumnName();
+ String columnType = columnDescriptor.getTypeName();
+ String defaultValue = columnDescriptor.getDefaultValue();
+ String comment = columnDescriptor.getComment();
+ String addDDL =
+ String.format(
+ ADD_DDL,
+ identifier(database) + "." + identifier(tableName),
+ identifier(columnName),
+ columnType);
+ if (defaultValue != null) {
+ addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue);
+ }
+ if (StringUtils.isNotEmpty(comment)) {
+ addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'";
+ }
+ return addDDL;
+ }
+
+ private static String identifier(String name) {
+ return "`" + name + "`";
+ }
+
+ private static String quoteDefaultValue(String defaultValue) {
+ // DEFAULT current_timestamp not need quote
+ if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+ return defaultValue;
+ }
+ return "'" + defaultValue + "'";
+ }
+
+ private static String quoteComment(String comment) {
+ return comment.replaceAll("'", "\\\\'");
+ }
+
+ public static class DDLSchema {
+ private final String columnName;
+ private final boolean isDropColumn;
+
+ public DDLSchema(String columnName, boolean isDropColumn) {
+ this.columnName = columnName;
+ this.isDropColumn = isDropColumn;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public boolean isDropColumn() {
+ return isDropColumn;
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
new file mode 100644
index 0000000..1ee9c1e
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.schema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.utils.HttpGetWithEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SchemaChangeManager implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaChangeManager.class);
+ private static final String CHECK_SCHEMA_CHANGE_API =
+ "http://%s/api/enable_light_schema_change/%s/%s";
+ private static final String SCHEMA_CHANGE_API =
"http://%s/api/query/default_cluster/%s";
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final DorisOptions dorisOptions;
+
+ public SchemaChangeManager(DorisOptions dorisOptions) {
+ this.dorisOptions = dorisOptions;
+ }
+
+ public static Map<String, Object> buildRequestParam(boolean dropColumn,
String columnName) {
+ Map<String, Object> params = new HashMap<>();
+ params.put("isDropColumn", dropColumn);
+ params.put("columnName", columnName);
+ return params;
+ }
+
+ /** check ddl can do light schema change. */
+ public boolean checkSchemaChange(String database, String table,
Map<String, Object> params)
+ throws IllegalArgumentException, IOException {
+ if (params.isEmpty()) {
+ return false;
+ }
+ String requestUrl =
+ String.format(CHECK_SCHEMA_CHANGE_API,
dorisOptions.getHttpUrl(), database, table);
+ HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
+ httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+ httpGet.setEntity(new
StringEntity(objectMapper.writeValueAsString(params)));
+ String responseEntity = "";
+ Map<String, Object> responseMap = handleResponse(httpGet,
responseEntity);
+ return handleSchemaChange(responseMap, responseEntity);
+ }
+
+ private boolean handleSchemaChange(Map<String, Object> responseMap, String
responseEntity) {
+ String code = responseMap.getOrDefault("code", "-1").toString();
+ if (code.equals("0")) {
+ return true;
+ } else {
+ throw new SchemaChangeException("Failed to schemaChange, response:
" + responseEntity);
+ }
+ }
+
+ /** execute sql in doris. */
+ public boolean execute(String ddl, String database)
+ throws IOException, IllegalArgumentException {
+ if (StringUtils.isEmpty(ddl)) {
+ return false;
+ }
+ LOG.info("Execute SQL: {}", ddl);
+ HttpPost httpPost = buildHttpPost(ddl, database);
+ String responseEntity = "";
+ Map<String, Object> responseMap = handleResponse(httpPost,
responseEntity);
+ return handleSchemaChange(responseMap, responseEntity);
+ }
+
+ public HttpPost buildHttpPost(String ddl, String database)
+ throws IllegalArgumentException, IOException {
+ Map<String, String> param = new HashMap<>();
+ param.put("stmt", ddl);
+ String requestUrl = String.format(SCHEMA_CHANGE_API,
dorisOptions.getHttpUrl(), database);
+ HttpPost httpPost = new HttpPost(requestUrl);
+ httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+ httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+ httpPost.setEntity(new
StringEntity(objectMapper.writeValueAsString(param)));
+ return httpPost;
+ }
+
+ private Map<String, Object> handleResponse(HttpUriRequest request, String
responseEntity) {
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ CloseableHttpResponse response = httpclient.execute(request);
+ final int statusCode = response.getStatusLine().getStatusCode();
+ final String reasonPhrase =
response.getStatusLine().getReasonPhrase();
+ if (statusCode == 200 && response.getEntity() != null) {
+ responseEntity = EntityUtils.toString(response.getEntity());
+ return objectMapper.readValue(responseEntity, Map.class);
+ } else {
+ throw new SchemaChangeException(
+ "Failed to schemaChange, status: "
+ + statusCode
+ + ", reason: "
+ + reasonPhrase);
+ }
+ } catch (Exception e) {
+ LOG.error("SchemaChange request error,", e);
+ throw new SchemaChangeException("SchemaChange request error with "
+ e.getMessage());
+ }
+ }
+
+ private String authHeader() {
+ return "Basic "
+ + new String(
+ Base64.encodeBase64(
+ (dorisOptions.getUser() + ":" +
dorisOptions.getPassword())
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index f1970bd..3cff4fa 100644
---
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -75,14 +75,14 @@ public class TestRecordService {
String noDeleteValue =
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
[...]
String expectedNoDeleteValue =
- "{\"id\":8,\"name\":\"Jfohn
Doe\",\"age\":430,\"email\":\"[email protected]\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"'10:30:00'\",\"text_column\":\"Lorem
ipsum dolor sit amet, consectetur adipiscing
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_D
[...]
+ "{\"id\":8,\"name\":\"Jfohn
Doe\",\"age\":430,\"email\":\"[email protected]\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"10:30:00\",\"text_column\":\"Lorem
ipsum dolor sit amet, consectetur adipiscing
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_DEL
[...]
buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
// delete value
String deleteValue =
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
[...]
String expectedDeleteValue =
- "{\"id\":8,\"name\":\"Jfohn
Doe\",\"age\":430,\"email\":\"[email protected]\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"'10:30:00'\",\"text_column\":\"Lorem
ipsum dolor sit amet, consectetur adipiscing
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_D
[...]
+ "{\"id\":8,\"name\":\"Jfohn
Doe\",\"age\":430,\"email\":\"[email protected]\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"10:30:00\",\"text_column\":\"Lorem
ipsum dolor sit amet, consectetur adipiscing
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_DEL
[...]
buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
new file mode 100644
index 0000000..c95af44
--- /dev/null
+++
b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
+import org.apache.doris.kafka.connector.model.doris.Schema;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange;
+import org.apache.doris.kafka.connector.writer.schema.SchemaChangeManager;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class TestDebeziumSchemaChange {
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final JsonConverter jsonConverter = new JsonConverter();
+ private final HashSet<String> sinkTableSet = new HashSet<>();
+ private DebeziumSchemaChange debeziumSchemaChange;
+ private DorisOptions dorisOptions;
+ private String topic;
+ private MockedStatic<RestService> mockRestService;
+
+ @Before
+ public void init() throws IOException {
+ InputStream stream =
+ this.getClass()
+ .getClassLoader()
+
.getResourceAsStream("doris-connector-sink.properties");
+ Properties props = new Properties();
+ props.load(stream);
+ props.put("task_id", "1");
+ props.put("name", "sink-connector-test");
+ topic = "normal";
+ dorisOptions = new DorisOptions((Map) props);
+ DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
+ DorisSystemService mockDorisSystemService =
mock(DorisSystemService.class);
+ jsonConverter.configure(new HashMap<>(), false);
+ mockRestService = mockStatic(RestService.class);
+ SchemaChangeManager mockSchemaChangeManager =
Mockito.mock(SchemaChangeManager.class);
+ Mockito.when(
+ mockSchemaChangeManager.checkSchemaChange(
+ Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(true);
+ debeziumSchemaChange =
+ new DebeziumSchemaChange(
+ topic,
+ 0,
+ dorisOptions,
+ new JdbcConnectionProvider(dorisOptions),
+ dorisConnectMonitor);
+ when(mockDorisSystemService.tableExists(anyString(),
anyString())).thenReturn(true);
+
+ sinkTableSet.add("normal_time");
+ debeziumSchemaChange.setSchemaChangeManager(mockSchemaChangeManager);
+ debeziumSchemaChange.setSinkTableSet(sinkTableSet);
+ debeziumSchemaChange.setDorisSystemService(mockDorisSystemService);
+ }
+
+ @Test
+ public void testAlterSchemaChange() {
+ String alterTopicMsg =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},
[...]
+ SchemaAndValue schemaAndValue =
+ jsonConverter.toConnectData(topic,
alterTopicMsg.getBytes(StandardCharsets.UTF_8));
+ SinkRecord sinkRecord =
+ TestRecordBuffer.newSinkRecord(schemaAndValue.value(), 8,
schemaAndValue.schema());
+ String normalTimeSchemaStr =
+
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"timestamp_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"datetime_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"date_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV2\"}],\"status\":200}";
+ Schema normalTimeSchema = null;
+ try {
+ normalTimeSchema = objectMapper.readValue(normalTimeSchemaStr,
Schema.class);
+ } catch (JsonProcessingException e) {
+ throw new DorisException(e);
+ }
+ mockRestService
+ .when(() -> RestService.getSchema(any(), any(), any(), any()))
+ .thenReturn(normalTimeSchema);
+
+ debeziumSchemaChange.insert(sinkRecord);
+ List<String> ddlSqlList = debeziumSchemaChange.getDdlSqlList();
+ Assert.assertEquals(
+ ddlSqlList.get(0),
+ "ALTER TABLE `test_db`.`normal_time` ADD COLUMN `time_test`
STRING DEFAULT '12:00'");
+ }
+
+ @After
+ public void close() {
+ mockRestService.close();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]