This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c4a1f6f  [feature]Support debezium table schema changes (#14)
c4a1f6f is described below

commit c4a1f6f24c67275a1ff2b97f440e4e815755dec9
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Wed May 8 16:47:16 2024 +0800

    [feature]Support debezium table schema changes (#14)
---
 pom.xml                                            |   8 +-
 .../doris/kafka/connector/cfg/DorisOptions.java    |  11 +-
 .../connector/cfg/DorisSinkConnectorConfig.java    |   1 +
 .../connector/converter/RecordDescriptor.java      |  56 ++++-
 .../type/debezium/AbstractDebeziumTimeType.java    |   2 +-
 .../kafka/connector/dialect/DialectProperties.java |  28 +++
 .../doris/kafka/connector/dialect/DorisType.java   |  47 ++++
 .../kafka/connector/dialect/mysql/MysqlType.java   | 213 +++++++++++++++++
 .../connector/exception/SchemaChangeException.java |  46 ++++
 .../kafka/connector/model/ColumnDescriptor.java    |  91 ++++++++
 .../kafka/connector/model/TableDescriptor.java     | 100 ++++++++
 .../doris/kafka/connector/model/doris/Field.java   | 148 ++++++++++++
 .../doris/kafka/connector/model/doris/Schema.java  | 110 +++++++++
 .../connector/service/DorisDefaultSinkService.java |  26 ++-
 .../connector/service/DorisSystemService.java      |  98 ++++++++
 .../doris/kafka/connector/service/RestService.java |  59 +++++
 .../kafka/connector/utils/ConfigCheckUtils.java    |  29 +++
 .../kafka/connector/utils/HttpGetWithEntity.java   |  37 +++
 .../kafka/connector/writer/CopyIntoWriter.java     |   7 +
 .../doris/kafka/connector/writer/DorisWriter.java  |   7 +-
 .../kafka/connector/writer/StreamLoadWriter.java   |   7 +
 .../writer/schema/DebeziumSchemaChange.java        | 252 +++++++++++++++++++++
 .../writer/schema/SchemaChangeHelper.java          | 159 +++++++++++++
 .../writer/schema/SchemaChangeManager.java         | 142 ++++++++++++
 .../connector/converter/TestRecordService.java     |   4 +-
 .../connector/writer/TestDebeziumSchemaChange.java | 133 +++++++++++
 26 files changed, 1806 insertions(+), 15 deletions(-)

diff --git a/pom.xml b/pom.xml
index d062676..282650f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@
         <metrics.version>4.2.25</metrics.version>
         <spotless.version>2.4.2</spotless.version>
         <debezium.version>1.9.8.Final</debezium.version>
-        <mockito.version>2.27.0</mockito.version>
+        <mockito.version>4.2.0</mockito.version>
         <junit.version>4.13.1</junit.version>
         <slf4j.version>1.7.25</slf4j.version>
         <mysql-connector.version>8.0.30</mysql-connector.version>
@@ -181,6 +181,12 @@
             <version>${mockito.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- 
https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
         <dependency>
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 6183869..e9f1297 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -44,7 +44,7 @@ public class DorisOptions {
     private final String password;
     private final String database;
     private final Map<String, String> topicMap;
-
+    private final String schemaTopic;
     private final int fileSize;
     private final int recordNum;
     private long flushTime;
@@ -105,6 +105,7 @@ public class DorisOptions {
             this.flushTime = 
DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN;
         }
         this.topicMap = getTopicToTableMap(config);
+        this.schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC);
 
         enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT;
         if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) {
@@ -196,6 +197,10 @@ public class DorisOptions {
         return topicMap.get(topic);
     }
 
+    public Map<String, String> getTopicMap() {
+        return topicMap;
+    }
+
     public String getQueryUrl() {
         List<String> queryUrls = getQueryUrls();
         return queryUrls.get(0);
@@ -288,6 +293,10 @@ public class DorisOptions {
         return enableDelete;
     }
 
+    public String getSchemaTopic() {
+        return schemaTopic;
+    }
+
     /**
      * parse topic to table map
      *
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 58b6a62..94ea08e 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -78,6 +78,7 @@ public class DorisSinkConnectorConfig {
     public static final String DELIVERY_GUARANTEE_DEFAULT = 
DeliveryGuarantee.AT_LEAST_ONCE.name();
     public static final String CONVERT_MODE = "converter.mode";
     public static final String CONVERT_MODE_DEFAULT = 
ConverterMode.NORMAL.getName();
+    public static final String SCHEMA_TOPIC = "schema.topic";
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
index 188f103..11d097f 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.doris.kafka.connector.dialect.mysql.MysqlType;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
@@ -120,6 +121,8 @@ public class RecordDescriptor {
         private final String name;
         private final String schemaTypeName;
         private final String schemaName;
+        private String comment;
+        private String defaultValue;
 
         public FieldDescriptor(
                 Schema schema, String name, String schemaTypeName, String 
schemaName) {
@@ -129,6 +132,18 @@ public class RecordDescriptor {
             this.schemaName = schemaName;
         }
 
+        public FieldDescriptor(
+                Schema schema,
+                String name,
+                String schemaTypeName,
+                String schemaName,
+                String comment,
+                String defaultValue) {
+            this(schema, name, schemaTypeName, schemaName);
+            this.comment = comment;
+            this.defaultValue = defaultValue;
+        }
+
         public String getName() {
             return name;
         }
@@ -144,11 +159,20 @@ public class RecordDescriptor {
         public String getSchemaTypeName() {
             return schemaTypeName;
         }
+
+        public String getComment() {
+            return comment;
+        }
+
+        public String getDefaultValue() {
+            return defaultValue;
+        }
     }
 
     public static class Builder {
 
         private SinkRecord sinkRecord;
+        private Struct tableChange;
 
         // Internal build state
         private final List<String> keyFieldNames = new ArrayList<>();
@@ -160,11 +184,20 @@ public class RecordDescriptor {
             return this;
         }
 
+        public Builder withTableChange(Struct tableChange) {
+            this.tableChange = tableChange;
+            return this;
+        }
+
         public RecordDescriptor build() {
             Objects.requireNonNull(sinkRecord, "The sink record must be 
provided.");
 
             final boolean flattened = !isTombstone(sinkRecord) && 
isFlattened(sinkRecord);
-            readSinkRecordNonKeyData(sinkRecord, flattened);
+            if (Objects.nonNull(tableChange)) {
+                readTableChangeData(tableChange);
+            } else {
+                readSinkRecordNonKeyData(sinkRecord, flattened);
+            }
 
             return new RecordDescriptor(
                     sinkRecord,
@@ -175,6 +208,27 @@ public class RecordDescriptor {
                     flattened);
         }
 
+        private void readTableChangeData(final Struct tableChange) {
+            Struct tableChangeTable = tableChange.getStruct("table");
+            List<Object> tableChangeColumns = 
tableChangeTable.getArray("columns");
+            for (Object column : tableChangeColumns) {
+                Struct columnStruct = (Struct) column;
+                Schema schema = columnStruct.schema();
+                String name = columnStruct.getString("name");
+                String typeName = columnStruct.getString("typeName");
+                Integer length = columnStruct.getInt32("length");
+                Integer scale = columnStruct.getInt32("scale");
+                String dorisType = MysqlType.toDorisType(typeName, length, 
scale);
+                String comment = columnStruct.getString("comment");
+                String defaultValue = 
columnStruct.getString("defaultValueExpression");
+                nonKeyFieldNames.add(name);
+                allFields.put(
+                        name,
+                        new FieldDescriptor(
+                                schema, name, dorisType, schema.name(), 
comment, defaultValue));
+            }
+        }
+
         private boolean isFlattened(SinkRecord record) {
             return record.valueSchema().name() == null
                     || !record.valueSchema().name().contains("Envelope");
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
index 9cde668..c673249 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
@@ -32,7 +32,7 @@ public abstract class AbstractDebeziumTimeType extends 
AbstractTimeType {
         }
         if (sourceValue instanceof Number) {
             final LocalTime localTime = getLocalTime((Number) sourceValue);
-            return String.format("'%s'", 
DateTimeFormatter.ISO_TIME.format(localTime));
+            return String.format("%s", 
DateTimeFormatter.ISO_TIME.format(localTime));
         }
         throw new ConnectException(
                 String.format(
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java 
b/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
new file mode 100644
index 0000000..62e4cab
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.dialect;
+
+public class DialectProperties {
+
+    /* Max precision of datetime type of Doris. */
+    public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
+
+    public static final int TIMESTAMP_TYPE_MAX_PRECISION = 9;
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java 
b/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
new file mode 100644
index 0000000..89b416a
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.dialect;
+
+public class DorisType {
+    public static final String BOOLEAN = "BOOLEAN";
+    public static final String TINYINT = "TINYINT";
+    public static final String SMALLINT = "SMALLINT";
+    public static final String INT = "INT";
+    public static final String BIGINT = "BIGINT";
+    public static final String LARGEINT = "LARGEINT";
+    public static final String FLOAT = "FLOAT";
+    public static final String DOUBLE = "DOUBLE";
+    public static final String DECIMAL = "DECIMAL";
+    public static final String DECIMAL_V3 = "DECIMALV3";
+    public static final String DATE = "DATE";
+    public static final String DATE_V2 = "DATEV2";
+    public static final String DATETIME = "DATETIME";
+    public static final String DATETIME_V2 = "DATETIMEV2";
+    public static final String CHAR = "CHAR";
+    public static final String VARCHAR = "VARCHAR";
+    public static final String STRING = "STRING";
+    public static final String HLL = "HLL";
+    public static final String BITMAP = "BITMAP";
+    public static final String ARRAY = "ARRAY";
+    public static final String JSONB = "JSONB";
+    public static final String JSON = "JSON";
+    public static final String MAP = "MAP";
+    public static final String STRUCT = "STRUCT";
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java 
b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
new file mode 100644
index 0000000..663aadf
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.dialect.mysql;
+
+import static 
org.apache.doris.kafka.connector.dialect.DialectProperties.MAX_SUPPORTED_DATE_TIME_PRECISION;
+import static 
org.apache.doris.kafka.connector.dialect.DialectProperties.TIMESTAMP_TYPE_MAX_PRECISION;
+
+import com.google.common.base.Preconditions;
+import org.apache.doris.kafka.connector.dialect.DorisType;
+
+public class MysqlType {
+
+    // MySQL driver returns width of timestamp types instead of precision.
+    // 19 characters are used for zero-precision timestamps while others
+    // require 19 + precision + 1 characters with the additional character
+    // required for the decimal separator.
+    private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
+    private static final String BIT = "BIT";
+    private static final String BOOLEAN = "BOOLEAN";
+    private static final String BOOL = "BOOL";
+    private static final String TINYINT = "TINYINT";
+    private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED 
ZEROFILL";
+    private static final String SMALLINT = "SMALLINT";
+    private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT 
UNSIGNED ZEROFILL";
+    private static final String MEDIUMINT = "MEDIUMINT";
+    private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT 
UNSIGNED ZEROFILL";
+    private static final String INT = "INT";
+    private static final String INT_UNSIGNED = "INT UNSIGNED";
+    private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED 
ZEROFILL";
+    private static final String BIGINT = "BIGINT";
+    private static final String SERIAL = "SERIAL";
+    private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED 
ZEROFILL";
+    private static final String REAL = "REAL";
+    private static final String REAL_UNSIGNED = "REAL UNSIGNED";
+    private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED 
ZEROFILL";
+    private static final String FLOAT = "FLOAT";
+    private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED 
ZEROFILL";
+    private static final String DOUBLE = "DOUBLE";
+    private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+    private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED 
ZEROFILL";
+    private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION 
UNSIGNED";
+    private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
+            "DOUBLE PRECISION UNSIGNED ZEROFILL";
+    private static final String NUMERIC = "NUMERIC";
+    private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
+    private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED 
ZEROFILL";
+    private static final String FIXED = "FIXED";
+    private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
+    private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED 
ZEROFILL";
+    private static final String DECIMAL = "DECIMAL";
+    private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED 
ZEROFILL";
+    private static final String CHAR = "CHAR";
+    private static final String VARCHAR = "VARCHAR";
+    private static final String TINYTEXT = "TINYTEXT";
+    private static final String MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String TEXT = "TEXT";
+    private static final String LONGTEXT = "LONGTEXT";
+    private static final String DATE = "DATE";
+    private static final String TIME = "TIME";
+    private static final String DATETIME = "DATETIME";
+    private static final String TIMESTAMP = "TIMESTAMP";
+    private static final String YEAR = "YEAR";
+    private static final String BINARY = "BINARY";
+    private static final String VARBINARY = "VARBINARY";
+    private static final String TINYBLOB = "TINYBLOB";
+    private static final String MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String BLOB = "BLOB";
+    private static final String LONGBLOB = "LONGBLOB";
+    private static final String JSON = "JSON";
+    private static final String ENUM = "ENUM";
+    private static final String SET = "SET";
+
+    public static String toDorisType(String type, Integer length, Integer 
scale) {
+        switch (type.toUpperCase()) {
+            case BIT:
+            case BOOLEAN:
+            case BOOL:
+                return DorisType.BOOLEAN;
+            case TINYINT:
+                return DorisType.TINYINT;
+            case TINYINT_UNSIGNED:
+            case TINYINT_UNSIGNED_ZEROFILL:
+            case SMALLINT:
+                return DorisType.SMALLINT;
+            case SMALLINT_UNSIGNED:
+            case SMALLINT_UNSIGNED_ZEROFILL:
+            case INT:
+            case MEDIUMINT:
+            case YEAR:
+                return DorisType.INT;
+            case INT_UNSIGNED:
+            case INT_UNSIGNED_ZEROFILL:
+            case MEDIUMINT_UNSIGNED:
+            case MEDIUMINT_UNSIGNED_ZEROFILL:
+            case BIGINT:
+                return DorisType.BIGINT;
+            case BIGINT_UNSIGNED:
+            case BIGINT_UNSIGNED_ZEROFILL:
+                return DorisType.LARGEINT;
+            case FLOAT:
+            case FLOAT_UNSIGNED:
+            case FLOAT_UNSIGNED_ZEROFILL:
+                return DorisType.FLOAT;
+            case REAL:
+            case REAL_UNSIGNED:
+            case REAL_UNSIGNED_ZEROFILL:
+            case DOUBLE:
+            case DOUBLE_UNSIGNED:
+            case DOUBLE_UNSIGNED_ZEROFILL:
+            case DOUBLE_PRECISION:
+            case DOUBLE_PRECISION_UNSIGNED:
+            case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
+                return DorisType.DOUBLE;
+            case NUMERIC:
+            case NUMERIC_UNSIGNED:
+            case NUMERIC_UNSIGNED_ZEROFILL:
+            case FIXED:
+            case FIXED_UNSIGNED:
+            case FIXED_UNSIGNED_ZEROFILL:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+            case DECIMAL_UNSIGNED_ZEROFILL:
+                return length != null && length <= 38
+                        ? String.format(
+                                "%s(%s,%s)",
+                                DorisType.DECIMAL_V3,
+                                length,
+                                scale != null && scale >= 0 ? scale : 0)
+                        : DorisType.STRING;
+            case DATE:
+                return DorisType.DATE_V2;
+            case DATETIME:
+            case TIMESTAMP:
+                // default precision is 0
+                // see 
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+                if (length == null
+                        || length <= 0
+                        || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
+                    return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
+                } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
+                    // Timestamp with a fraction of seconds.
+                    // For example, 2024-01-01 01:01:01.1
+                    // The decimal point will occupy 1 character.
+                    // Thus,the length of the timestamp is 21.
+                    return String.format(
+                            "%s(%s)",
+                            DorisType.DATETIME_V2,
+                            Math.min(
+                                    length - 
ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
+                                    MAX_SUPPORTED_DATE_TIME_PRECISION));
+                } else if (length <= TIMESTAMP_TYPE_MAX_PRECISION) {
+                    // For Debezium JSON data, the timestamp/datetime length 
ranges from 0 to 9.
+                    return String.format(
+                            "%s(%s)",
+                            DorisType.DATETIME_V2,
+                            Math.min(length, 
MAX_SUPPORTED_DATE_TIME_PRECISION));
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported length: "
+                                    + length
+                                    + " for MySQL TIMESTAMP/DATETIME types");
+                }
+            case CHAR:
+            case VARCHAR:
+                Preconditions.checkNotNull(length);
+                return length * 3 > 65533
+                        ? DorisType.STRING
+                        : String.format("%s(%s)", DorisType.VARCHAR, length * 
3);
+            case TINYTEXT:
+            case TEXT:
+            case MEDIUMTEXT:
+            case LONGTEXT:
+            case ENUM:
+            case TIME:
+            case TINYBLOB:
+            case BLOB:
+            case MEDIUMBLOB:
+            case LONGBLOB:
+            case BINARY:
+            case VARBINARY:
+            case SET:
+                return DorisType.STRING;
+            case JSON:
+                return DorisType.JSONB;
+            default:
+                throw new UnsupportedOperationException("Unsupported MySQL 
Type: " + type);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java
 
b/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java
new file mode 100644
index 0000000..4c0ce46
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/exception/SchemaChangeException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.exception;
+
+/** Doris Schema Change run exception. */
+public class SchemaChangeException extends RuntimeException {
+    public SchemaChangeException() {
+        super();
+    }
+
+    public SchemaChangeException(String message) {
+        super(message);
+    }
+
+    public SchemaChangeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SchemaChangeException(Throwable cause) {
+        super(cause);
+    }
+
+    protected SchemaChangeException(
+            String message,
+            Throwable cause,
+            boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java 
b/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java
new file mode 100644
index 0000000..f34c9fa
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/model/ColumnDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model;
+
+import java.util.Objects;
+
+public class ColumnDescriptor {
+    private final String columnName;
+    private final String typeName;
+    private final String comment;
+    private final String defaultValue;
+
+    private ColumnDescriptor(
+            String columnName, String typeName, String comment, String 
defaultValue) {
+        this.columnName = columnName;
+        this.typeName = typeName;
+        this.comment = comment;
+        this.defaultValue = defaultValue;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public String getTypeName() {
+        return typeName;
+    }
+
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String columnName;
+        private String typeName;
+        private String comment;
+        private String defaultValue;
+
+        public Builder columnName(String columnName) {
+            this.columnName = columnName;
+            return this;
+        }
+
+        public Builder typeName(String typeName) {
+            this.typeName = typeName;
+            return this;
+        }
+
+        public Builder comment(String comment) {
+            this.comment = comment;
+            return this;
+        }
+
+        public Builder defaultValue(String defaultValue) {
+            this.defaultValue = defaultValue;
+            return this;
+        }
+
+        public ColumnDescriptor build() {
+            Objects.requireNonNull(columnName, "A column name is required");
+            Objects.requireNonNull(typeName, "A type name is required");
+
+            return new ColumnDescriptor(columnName, typeName, comment, 
defaultValue);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java 
b/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java
new file mode 100644
index 0000000..4cf14f4
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/model/TableDescriptor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TableDescriptor {
+    private final String tableName;
+    private final String tableType;
+    private final Map<String, ColumnDescriptor> columns = new 
LinkedHashMap<>();
+
+    private TableDescriptor(String tableName, String tableType, 
List<ColumnDescriptor> columns) {
+        this.tableName = tableName;
+        this.tableType = tableType;
+        columns.forEach(c -> this.columns.put(c.getColumnName(), c));
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getTableType() {
+        return tableType;
+    }
+
+    public Collection<ColumnDescriptor> getColumns() {
+        return columns.values();
+    }
+
+    public ColumnDescriptor getColumnByName(String columnName) {
+        return columns.get(columnName);
+    }
+
+    public boolean hasColumn(String columnName) {
+        return columns.containsKey(columnName);
+    }
+
+    public static class Builder {
+        private String schemaName;
+        private String tableName;
+        private String tableType;
+        private final List<ColumnDescriptor> columns = new ArrayList<>();
+
+        private Builder() {}
+
+        public Builder schemaName(String schemaName) {
+            this.schemaName = schemaName;
+            return this;
+        }
+
+        public Builder tableName(String tableName) {
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Builder type(String tableType) {
+            this.tableType = tableType;
+            return this;
+        }
+
+        public Builder column(ColumnDescriptor column) {
+            this.columns.add(column);
+            return this;
+        }
+
+        public Builder columns(List<ColumnDescriptor> columns) {
+            this.columns.addAll(columns);
+            return this;
+        }
+
+        public TableDescriptor build() {
+            return new TableDescriptor(tableName, tableType, columns);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java 
b/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java
new file mode 100644
index 0000000..881a4a9
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/model/doris/Field.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model.doris;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Objects;
+
+public class Field {
+    @JsonProperty(value = "name")
+    private String name;
+
+    @JsonProperty(value = "type")
+    private String type;
+
+    @JsonProperty(value = "comment")
+    private String comment;
+
+    @JsonProperty(value = "precision")
+    private int precision;
+
+    @JsonProperty(value = "scale")
+    private int scale;
+
+    @JsonProperty(value = "aggregation_type")
+    private String aggregationType;
+
+    public Field() {}
+
+    public Field(
+            String name,
+            String type,
+            String comment,
+            int precision,
+            int scale,
+            String aggregationType) {
+        this.name = name;
+        this.type = type;
+        this.comment = comment;
+        this.precision = precision;
+        this.scale = scale;
+        this.aggregationType = aggregationType;
+    }
+
+    public String getAggregationType() {
+        return aggregationType;
+    }
+
+    public void setAggregationType(String aggregationType) {
+        this.aggregationType = aggregationType;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public void setComment(String comment) {
+        this.comment = comment;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    public void setPrecision(int precision) {
+        this.precision = precision;
+    }
+
+    public int getScale() {
+        return scale;
+    }
+
+    public void setScale(int scale) {
+        this.scale = scale;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Field field = (Field) o;
+        return precision == field.precision
+                && scale == field.scale
+                && Objects.equals(name, field.name)
+                && Objects.equals(type, field.type)
+                && Objects.equals(comment, field.comment);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, comment, precision, scale);
+    }
+
+    @Override
+    public String toString() {
+        return "Field{"
+                + "name='"
+                + name
+                + '\''
+                + ", type='"
+                + type
+                + '\''
+                + ", comment='"
+                + comment
+                + '\''
+                + ", precision="
+                + precision
+                + ", scale="
+                + scale
+                + '}';
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java 
b/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java
new file mode 100644
index 0000000..be29b3e
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/model/doris/Schema.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model.doris;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class Schema {
+    private int status = 0;
+    private String keysType;
+    private List<Field> properties;
+
+    public Schema() {
+        properties = new ArrayList<>();
+    }
+
+    public Schema(int fieldCount) {
+        properties = new ArrayList<>(fieldCount);
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    public String getKeysType() {
+        return keysType;
+    }
+
+    public void setKeysType(String keysType) {
+        this.keysType = keysType;
+    }
+
+    public List<Field> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(List<Field> properties) {
+        this.properties = properties;
+    }
+
+    public void put(
+            String name,
+            String type,
+            String comment,
+            int scale,
+            int precision,
+            String aggregationType) {
+        properties.add(new Field(name, type, comment, scale, precision, 
aggregationType));
+    }
+
+    public void put(Field f) {
+        properties.add(f);
+    }
+
+    public Field get(int index) {
+        if (index >= properties.size()) {
+            throw new IndexOutOfBoundsException(
+                    "Index: " + index + ", Fields sizeļ¼š" + properties.size());
+        }
+        return properties.get(index);
+    }
+
+    public int size() {
+        return properties.size();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Schema schema = (Schema) o;
+        return status == schema.status && Objects.equals(properties, 
schema.properties);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(status, properties);
+    }
+
+    @Override
+    public String toString() {
+        return "Schema{" + "status=" + status + ", properties=" + properties + 
'}';
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index f25e4b9..29810e4 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -23,6 +23,7 @@ import com.codahale.metrics.MetricRegistry;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.doris.kafka.connector.DorisSinkTask;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.connection.ConnectionProvider;
@@ -34,6 +35,7 @@ import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
 import org.apache.doris.kafka.connector.writer.DorisWriter;
 import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
+import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -86,15 +88,23 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
         if (writer.containsKey(nameIndex)) {
             LOG.info("already start task");
         } else {
-            LoadModel loadModel = dorisOptions.getLoadModel();
+            DorisWriter dorisWriter;
             String topic = topicPartition.topic();
             int partition = topicPartition.partition();
-            DorisWriter dorisWriter =
-                    LoadModel.COPY_INTO.equals(loadModel)
-                            ? new CopyIntoWriter(
-                                    topic, partition, dorisOptions, conn, 
connectMonitor)
-                            : new StreamLoadWriter(
-                                    topic, partition, dorisOptions, conn, 
connectMonitor);
+            String schemaChangeTopic = dorisOptions.getSchemaTopic();
+            if (Objects.nonNull(schemaChangeTopic) && 
schemaChangeTopic.equals(topic)) {
+                dorisWriter =
+                        new DebeziumSchemaChange(
+                                topic, partition, dorisOptions, conn, 
connectMonitor);
+            } else {
+                LoadModel loadModel = dorisOptions.getLoadModel();
+                dorisWriter =
+                        LoadModel.COPY_INTO.equals(loadModel)
+                                ? new CopyIntoWriter(
+                                        topic, partition, dorisOptions, conn, 
connectMonitor)
+                                : new StreamLoadWriter(
+                                        topic, partition, dorisOptions, conn, 
connectMonitor);
+            }
             writer.put(nameIndex, dorisWriter);
             metricsJmxReporter.start();
         }
@@ -119,7 +129,7 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
         // check all sink writer to see if they need to be flushed
         for (DorisWriter writer : writer.values()) {
             // Time based flushing
-            if (writer.shouldFlush()) {
+            if (!(writer instanceof DebeziumSchemaChange) && 
writer.shouldFlush()) {
                 writer.flushBuffer();
             }
         }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
new file mode 100644
index 0000000..2365aa4
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.service;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DorisSystemService {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisSystemService.class);
+    private final JdbcConnectionProvider jdbcConnectionProvider;
+
+    public DorisSystemService(DorisOptions dorisOptions) {
+        this.jdbcConnectionProvider = new JdbcConnectionProvider(dorisOptions);
+    }
+
+    private static final List<String> builtinDatabases =
+            Collections.singletonList("information_schema");
+
+    public boolean tableExists(String database, String table) {
+        return databaseExists(database) && 
listTables(database).contains(table);
+    }
+
+    public boolean databaseExists(String database) {
+        return listDatabases().contains(database);
+    }
+
+    public List<String> listDatabases() {
+        return extractColumnValuesBySQL(
+                "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
+                1,
+                dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    public List<String> listTables(String databaseName) {
+        if (!databaseExists(databaseName)) {
+            throw new DorisException("database" + databaseName + " is not 
exists");
+        }
+        return extractColumnValuesBySQL(
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE 
TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    public List<String> extractColumnValuesBySQL(
+            String sql, int columnIndex, Predicate<String> filterFunc, 
Object... params) {
+
+        List<String> columnValues = Lists.newArrayList();
+        try (PreparedStatement ps =
+                
jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || 
filterFunc.test(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            LOG.error("The following SQL query could not be executed: {}", 
sql, e);
+            throw new DorisException(
+                    String.format("The following SQL query could not be 
executed: %s", sql), e);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java 
b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
index e6b5c33..e7e4a4f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
+++ b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
@@ -20,6 +20,7 @@
 package org.apache.doris.kafka.connector.service;
 
 import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
@@ -39,19 +40,29 @@ import org.apache.commons.io.IOUtils;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.exception.ConnectedFailedException;
 import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
 import org.apache.doris.kafka.connector.model.BackendV2;
 import org.apache.doris.kafka.connector.model.LoadOperation;
+import org.apache.doris.kafka.connector.model.doris.Schema;
 import org.apache.doris.kafka.connector.utils.BackoffAndRetryUtils;
+import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 
 public class RestService {
 
     private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
+    private static final String TABLE_SCHEMA_API = 
"http://%s/api/%s/%s/_schema";;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     /**
      * get Doris BE nodes to request.
@@ -285,4 +296,52 @@ public class RestService {
         logger.debug("Parsing schema result is '{}'.", backendRows);
         return backendRows;
     }
+
+    /** Get table schema from doris. */
+    public static Schema getSchema(
+            DorisOptions dorisOptions, String db, String table, Logger logger) 
{
+        logger.trace("start get " + db + "." + table + " schema from doris.");
+        Object responseData = null;
+        try {
+            String tableSchemaUri =
+                    String.format(TABLE_SCHEMA_API, dorisOptions.getHttpUrl(), 
db, table);
+            HttpGet httpGet = new HttpGet(tableSchemaUri);
+            httpGet.setHeader(HttpHeaders.AUTHORIZATION, 
authHeader(dorisOptions));
+            Map<String, Object> responseMap = handleResponse(httpGet, logger);
+            responseData = responseMap.get("data");
+            String schemaStr = OBJECT_MAPPER.writeValueAsString(responseData);
+            return OBJECT_MAPPER.readValue(schemaStr, Schema.class);
+        } catch (JsonProcessingException | IllegalArgumentException e) {
+            throw new SchemaChangeException("can not parse response schema " + 
responseData, e);
+        }
+    }
+
+    private static String authHeader(DorisOptions dorisOptions) {
+        return "Basic "
+                + new String(
+                        org.apache.commons.codec.binary.Base64.encodeBase64(
+                                (dorisOptions.getUser() + ":" + 
dorisOptions.getPassword())
+                                        .getBytes(StandardCharsets.UTF_8)));
+    }
+
+    private static Map handleResponse(HttpUriRequest request, Logger logger) {
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            CloseableHttpResponse response = httpclient.execute(request);
+            final int statusCode = response.getStatusLine().getStatusCode();
+            final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
+            if (statusCode == 200 && response.getEntity() != null) {
+                String responseEntity = 
EntityUtils.toString(response.getEntity());
+                return OBJECT_MAPPER.readValue(responseEntity, Map.class);
+            } else {
+                throw new SchemaChangeException(
+                        "Failed to schemaChange, status: "
+                                + statusCode
+                                + ", reason: "
+                                + reasonPhrase);
+            }
+        } catch (Exception e) {
+            logger.trace("SchemaChange request error,", e);
+            throw new SchemaChangeException("SchemaChange request error with " 
+ e.getMessage());
+        }
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 2356f7d..84d3f90 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -19,9 +19,13 @@
 
 package org.apache.doris.kafka.connector.utils;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.exception.ArgumentsException;
 import org.apache.doris.kafka.connector.exception.DorisException;
@@ -73,6 +77,31 @@ public class ConfigCheckUtils {
             configIsValid = false;
         }
 
+        String schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC);
+        if (StringUtils.isNotEmpty(schemaTopic)) {
+            schemaTopic = schemaTopic.trim();
+            if (!topics.isEmpty()) {
+                List<String> topicList =
+                        
Arrays.stream(topics.split(",")).collect(Collectors.toList());
+                if (!topicList.contains(schemaTopic)) {
+                    LOG.error(
+                            "schema.topic is not included in topics list, 
please add! "
+                                    + " schema.topic={}, topics={}",
+                            schemaTopic,
+                            topics);
+                    configIsValid = false;
+                }
+            }
+            if (!topicsRegex.isEmpty() && !topicsRegex.equals(schemaTopic)) {
+                LOG.error(
+                        "topics.regex must equals schema.topic. please check 
again!"
+                                + " topics.regex={}, schema.topic={}",
+                        topicsRegex,
+                        schemaTopic);
+                configIsValid = false;
+            }
+        }
+
         if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
                 && 
parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
                         == null) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
new file mode 100644
index 0000000..b36a1d5
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.utils;
+
+import java.net.URI;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+
+public class HttpGetWithEntity extends HttpEntityEnclosingRequestBase {
+    private static final String METHOD_NAME = "GET";
+
+    @Override
+    public String getMethod() {
+        return METHOD_NAME;
+    }
+
+    public HttpGetWithEntity(final String uri) {
+        super();
+        setURI(URI.create(uri));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
index a81520c..5018b27 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
@@ -33,6 +33,7 @@ import 
org.apache.doris.kafka.connector.exception.CopyLoadException;
 import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
 import org.apache.doris.kafka.connector.utils.FileNameUtils;
 import org.apache.doris.kafka.connector.writer.load.CopyLoad;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,6 +104,12 @@ public class CopyIntoWriter extends DorisWriter {
         return loadFileList;
     }
 
+    @Override
+    public void insert(SinkRecord record) {
+        initRecord(record);
+        insertRecord(record);
+    }
+
     @Override
     public long getOffset() {
         if (fileNames.isEmpty()) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 29cf1ec..0ace5a6 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -58,6 +58,7 @@ public abstract class DorisWriter {
     protected RecordService recordService;
     protected int taskId;
     protected final DorisConnectMonitor connectMonitor;
+    protected boolean schemaChange;
 
     public DorisWriter(
             String topic,
@@ -100,7 +101,9 @@ public abstract class DorisWriter {
     /** read offset from doris */
     public abstract void fetchOffset();
 
-    public void insert(final SinkRecord record) {
+    public void insert(final SinkRecord record) {}
+
+    protected void initRecord(final SinkRecord record) {
         // init offset
         if (!hasInitialized
                 && 
DeliveryGuarantee.EXACTLY_ONCE.equals(dorisOptions.getDeliveryGuarantee())) {
@@ -113,7 +116,9 @@ public abstract class DorisWriter {
             fetchOffset();
             this.hasInitialized = true;
         }
+    }
 
+    protected void insertRecord(final SinkRecord record) {
         // discard the record if the record offset is smaller or equal to 
server side offset
         if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
                 && record.kafkaOffset() > processedOffset.get()) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index 0d76b32..a8fc00a 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -39,6 +39,7 @@ import org.apache.doris.kafka.connector.utils.FileNameUtils;
 import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
 import org.apache.doris.kafka.connector.writer.commit.DorisCommitter;
 import org.apache.doris.kafka.connector.writer.load.DorisStreamLoad;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,6 +140,12 @@ public class StreamLoadWriter extends DorisWriter {
         return label2Status;
     }
 
+    @Override
+    public void insert(SinkRecord record) {
+        initRecord(record);
+        insertRecord(record);
+    }
+
     @Override
     public long getOffset() {
         if (committableList.isEmpty()) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
new file mode 100644
index 0000000..fc0823b
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.schema;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.debezium.data.Envelope;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.connection.ConnectionProvider;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
+import org.apache.doris.kafka.connector.model.ColumnDescriptor;
+import org.apache.doris.kafka.connector.model.TableDescriptor;
+import org.apache.doris.kafka.connector.model.doris.Schema;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.apache.doris.kafka.connector.writer.DorisWriter;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebeziumSchemaChange extends DorisWriter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumSchemaChange.class);
+    private final Map<String, String> topic2TableMap;
+    private SchemaChangeManager schemaChangeManager;
+    private DorisSystemService dorisSystemService;
+    private Set<String> sinkTableSet;
+    private List<String> ddlSqlList;
+
+    public DebeziumSchemaChange(
+            String topic,
+            int partition,
+            DorisOptions dorisOptions,
+            ConnectionProvider connectionProvider,
+            DorisConnectMonitor connectMonitor) {
+        super(topic, partition, dorisOptions, connectionProvider, 
connectMonitor);
+        this.schemaChange = true;
+        this.sinkTableSet = new HashSet<>();
+        this.dorisSystemService = new DorisSystemService(dorisOptions);
+        this.topic2TableMap = dorisOptions.getTopicMap();
+        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+        init();
+    }
+
+    @Override
+    public void fetchOffset() {
+        // do nothing
+    }
+
+    private void init() {
+        Set<Map.Entry<String, String>> entrySet = topic2TableMap.entrySet();
+        for (Map.Entry<String, String> entry : entrySet) {
+            sinkTableSet.add(entry.getValue());
+        }
+    }
+
+    @Override
+    public void insert(SinkRecord record) {
+        schemaChange(record);
+    }
+
+    @Override
+    public void commit(int partition) {
+        // do nothing
+    }
+
+    private void schemaChange(final SinkRecord record) {
+        String tableName = resolveTableName(record);
+        if (tableName == null) {
+            LOG.warn(
+                    "Ignored to write record from topic '{}' partition '{}' 
offset '{}'. No resolvable table name",
+                    record.topic(),
+                    record.kafkaPartition(),
+                    record.kafkaOffset());
+            processedOffset.set(record.kafkaOffset());
+            return;
+        }
+        Struct recordStruct = (Struct) (record.value());
+        List<Object> tableChanges = recordStruct.getArray("tableChanges");
+        Struct tableChange = (Struct) tableChanges.get(0);
+        if ("DROP".equalsIgnoreCase(tableChange.getString("type"))
+                || "CREATE".equalsIgnoreCase(tableChange.getString("type"))) {
+            LOG.warn(
+                    "CREATE and DROP {} tables are currently not supported. 
Please create or drop them manually.",
+                    tableName);
+            processedOffset.set(record.kafkaOffset());
+            return;
+        }
+        RecordDescriptor recordDescriptor =
+                RecordDescriptor.builder()
+                        .withSinkRecord(record)
+                        .withTableChange(tableChange)
+                        .build();
+        tableChange(tableName, recordDescriptor);
+    }
+
+    private String resolveTableName(SinkRecord record) {
+        if (isTombstone(record)) {
+            LOG.warn(
+                    "Ignore this record because it seems to be a tombstone 
that doesn't have source field, then cannot resolve table name in topic '{}', 
partition '{}', offset '{}'",
+                    record.topic(),
+                    record.kafkaPartition(),
+                    record.kafkaOffset());
+            return null;
+        }
+        Struct source = ((Struct) 
record.value()).getStruct(Envelope.FieldName.SOURCE);
+        return source.getString("table");
+    }
+
+    private void alterTableIfNeeded(String tableName, RecordDescriptor record) 
{
+        LOG.debug("Attempting to alter table '{}'.", tableName);
+        if (!hasTable(tableName)) {
+            LOG.error("Table '{}' does not exist and cannot be altered.", 
tableName);
+            throw new SchemaChangeException("Could not find table: " + 
tableName);
+        }
+        final TableDescriptor dorisTableDescriptor = 
obtainTableSchema(tableName);
+        SchemaChangeHelper.compareSchema(dorisTableDescriptor, 
record.getFields());
+        ddlSqlList = 
SchemaChangeHelper.generateDDLSql(dorisOptions.getDatabase(), tableName);
+        doSchemaChange(dorisOptions.getDatabase(), tableName);
+    }
+
+    /** Obtain table schema from doris. */
+    private TableDescriptor obtainTableSchema(String tableName) {
+        Schema schema = RestService.getSchema(dorisOptions, dbName, tableName, 
LOG);
+        List<ColumnDescriptor> columnDescriptors = new ArrayList<>();
+        schema.getProperties()
+                .forEach(
+                        column -> {
+                            ColumnDescriptor columnDescriptor =
+                                    ColumnDescriptor.builder()
+                                            .columnName(column.getName())
+                                            .typeName(column.getType())
+                                            .comment(column.getComment())
+                                            .build();
+                            columnDescriptors.add(columnDescriptor);
+                        });
+        return TableDescriptor.builder()
+                .tableName(tableName)
+                .type(schema.getKeysType())
+                .columns(columnDescriptors)
+                .build();
+    }
+
+    private boolean hasTable(String tableName) {
+        return dorisSystemService.tableExists(dbName, tableName);
+    }
+
+    private void tableChange(String tableName, RecordDescriptor 
recordDescriptor) {
+        if (!sinkTableSet.contains(tableName)) {
+            processedOffset.set(recordDescriptor.getOffset());
+            LOG.warn(
+                    "The "
+                            + tableName
+                            + " is not defined and requires synchronized data. 
If you need to synchronize the table data, please configure it in 
'doris.topic2table.map'");
+            return;
+        }
+
+        if (!hasTable(tableName)) {
+            // TODO Table does not exist, automatically created it.
+            LOG.error("{} Table does not exist, please create manually.", 
tableName);
+        } else {
+            // Table exists, lets attempt to alter it if necessary.
+            alterTableIfNeeded(tableName, recordDescriptor);
+        }
+        processedOffset.set(recordDescriptor.getOffset());
+    }
+
+    private boolean doSchemaChange(String database, String tableName) {
+        boolean status = false;
+        if (ddlSqlList.isEmpty()) {
+            LOG.info("Schema change ddl is empty, not need do schema change.");
+            return false;
+        }
+        try {
+            List<SchemaChangeHelper.DDLSchema> ddlSchemas = 
SchemaChangeHelper.getDdlSchemas();
+            for (int i = 0; i < ddlSqlList.size(); i++) {
+                SchemaChangeHelper.DDLSchema ddlSchema = ddlSchemas.get(i);
+                String ddlSql = ddlSqlList.get(i);
+                boolean doSchemaChange = checkSchemaChange(database, 
tableName, ddlSchema);
+                status =
+                        doSchemaChange
+                                && schemaChangeManager.execute(ddlSql, 
dorisOptions.getDatabase());
+                LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
+            }
+        } catch (Exception e) {
+            LOG.warn("schema change error :", e);
+        }
+        return status;
+    }
+
+    private boolean checkSchemaChange(
+            String database, String table, SchemaChangeHelper.DDLSchema 
ddlSchema)
+            throws IllegalArgumentException, IOException {
+        Map<String, Object> param =
+                SchemaChangeManager.buildRequestParam(
+                        ddlSchema.isDropColumn(), ddlSchema.getColumnName());
+        return schemaChangeManager.checkSchemaChange(database, table, param);
+    }
+
+    public long getOffset() {
+        committedOffset.set(processedOffset.get());
+        return committedOffset.get();
+    }
+
+    private boolean isTombstone(SinkRecord record) {
+        return record.value() == null;
+    }
+
+    @VisibleForTesting
+    public void setSinkTableSet(Set<String> sinkTableSet) {
+        this.sinkTableSet = sinkTableSet;
+    }
+
+    @VisibleForTesting
+    public void setDorisSystemService(DorisSystemService dorisSystemService) {
+        this.dorisSystemService = dorisSystemService;
+    }
+
+    @VisibleForTesting
+    public List<String> getDdlSqlList() {
+        return ddlSqlList;
+    }
+
+    @VisibleForTesting
+    public void setSchemaChangeManager(SchemaChangeManager 
schemaChangeManager) {
+        this.schemaChangeManager = schemaChangeManager;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
new file mode 100644
index 0000000..abf424c
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.schema;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import org.apache.doris.kafka.connector.model.ColumnDescriptor;
+import org.apache.doris.kafka.connector.model.TableDescriptor;
+
+public class SchemaChangeHelper {
+    private static final List<ColumnDescriptor> addColumnDescriptors = 
Lists.newArrayList();
+    // Used to determine whether the column in the doris table can undergo 
schema change
+    private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+
+    // TODO support drop column
+    // Dropping a column is a dangerous behavior and may result in an 
accidental deletion.
+    // There are some problems in the current implementation: each alter 
column operation will read
+    // the table structure
+    // in doris and compare the schema with the topic message.
+    // When there are more columns in the doris table than in the upstream 
table,
+    // these redundant columns in doris will be dropped, regardless of these 
redundant columns, is
+    // what you need.
+    // Therefore, the operation of dropping a column behavior currently 
requires the user to do it
+    // himself.
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    /**
+     * Compare kafka upstream table structure with doris table structure. If 
kafka field does not
+     * contain the structure of dorisTable, then need to add this field.
+     *
+     * @param dorisTable read from the table schema of doris.
+     * @param fields table structure from kafka upstream data source.
+     */
+    public static void compareSchema(
+            TableDescriptor dorisTable, Map<String, 
RecordDescriptor.FieldDescriptor> fields) {
+        // Determine whether fields need to be added to doris table
+        addColumnDescriptors.clear();
+        Collection<ColumnDescriptor> dorisTableColumns = 
dorisTable.getColumns();
+        Set<String> dorisTableColumnNames =
+                dorisTableColumns.stream()
+                        .map(ColumnDescriptor::getColumnName)
+                        .collect(Collectors.toSet());
+        Set<Map.Entry<String, RecordDescriptor.FieldDescriptor>> fieldsEntries 
= fields.entrySet();
+        for (Map.Entry<String, RecordDescriptor.FieldDescriptor> fieldEntry : 
fieldsEntries) {
+            String fieldName = fieldEntry.getKey();
+            if (!dorisTableColumnNames.contains(fieldName)) {
+                RecordDescriptor.FieldDescriptor fieldDescriptor = 
fieldEntry.getValue();
+                ColumnDescriptor columnDescriptor =
+                        new ColumnDescriptor.Builder()
+                                .columnName(fieldDescriptor.getName())
+                                .typeName(fieldDescriptor.getSchemaTypeName())
+                                
.defaultValue(fieldDescriptor.getDefaultValue())
+                                .comment(fieldDescriptor.getComment())
+                                .build();
+                addColumnDescriptors.add(columnDescriptor);
+            }
+        }
+    }
+
+    public static List<String> generateDDLSql(String database, String table) {
+        ddlSchemas.clear();
+        List<String> ddlList = Lists.newArrayList();
+        for (ColumnDescriptor columnDescriptor : addColumnDescriptors) {
+            ddlList.add(buildAddColumnDDL(database, table, columnDescriptor));
+            ddlSchemas.add(new DDLSchema(columnDescriptor.getColumnName(), 
false));
+        }
+        return ddlList;
+    }
+
+    public static List<DDLSchema> getDdlSchemas() {
+        return ddlSchemas;
+    }
+
+    private static String buildDropColumnDDL(String database, String 
tableName, String columName) {
+        return String.format(
+                DROP_DDL,
+                identifier(database) + "." + identifier(tableName),
+                identifier(columName));
+    }
+
+    private static String buildAddColumnDDL(
+            String database, String tableName, ColumnDescriptor 
columnDescriptor) {
+        String columnName = columnDescriptor.getColumnName();
+        String columnType = columnDescriptor.getTypeName();
+        String defaultValue = columnDescriptor.getDefaultValue();
+        String comment = columnDescriptor.getComment();
+        String addDDL =
+                String.format(
+                        ADD_DDL,
+                        identifier(database) + "." + identifier(tableName),
+                        identifier(columnName),
+                        columnType);
+        if (defaultValue != null) {
+            addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue);
+        }
+        if (StringUtils.isNotEmpty(comment)) {
+            addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'";
+        }
+        return addDDL;
+    }
+
+    private static String identifier(String name) {
+        return "`" + name + "`";
+    }
+
+    private static String quoteDefaultValue(String defaultValue) {
+        // DEFAULT current_timestamp not need quote
+        if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+            return defaultValue;
+        }
+        return "'" + defaultValue + "'";
+    }
+
+    private static String quoteComment(String comment) {
+        return comment.replaceAll("'", "\\\\'");
+    }
+
+    public static class DDLSchema {
+        private final String columnName;
+        private final boolean isDropColumn;
+
+        public DDLSchema(String columnName, boolean isDropColumn) {
+            this.columnName = columnName;
+            this.isDropColumn = isDropColumn;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public boolean isDropColumn() {
+            return isDropColumn;
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
new file mode 100644
index 0000000..1ee9c1e
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.schema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.utils.HttpGetWithEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SchemaChangeManager implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeManager.class);
+    private static final String CHECK_SCHEMA_CHANGE_API =
+            "http://%s/api/enable_light_schema_change/%s/%s";;
+    private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private final DorisOptions dorisOptions;
+
+    public SchemaChangeManager(DorisOptions dorisOptions) {
+        this.dorisOptions = dorisOptions;
+    }
+
+    public static Map<String, Object> buildRequestParam(boolean dropColumn, 
String columnName) {
+        Map<String, Object> params = new HashMap<>();
+        params.put("isDropColumn", dropColumn);
+        params.put("columnName", columnName);
+        return params;
+    }
+
+    /** check ddl can do light schema change. */
+    public boolean checkSchemaChange(String database, String table, 
Map<String, Object> params)
+            throws IllegalArgumentException, IOException {
+        if (params.isEmpty()) {
+            return false;
+        }
+        String requestUrl =
+                String.format(CHECK_SCHEMA_CHANGE_API, 
dorisOptions.getHttpUrl(), database, table);
+        HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
+        httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpGet.setEntity(new 
StringEntity(objectMapper.writeValueAsString(params)));
+        String responseEntity = "";
+        Map<String, Object> responseMap = handleResponse(httpGet, 
responseEntity);
+        return handleSchemaChange(responseMap, responseEntity);
+    }
+
+    private boolean handleSchemaChange(Map<String, Object> responseMap, String 
responseEntity) {
+        String code = responseMap.getOrDefault("code", "-1").toString();
+        if (code.equals("0")) {
+            return true;
+        } else {
+            throw new SchemaChangeException("Failed to schemaChange, response: 
" + responseEntity);
+        }
+    }
+
+    /** execute sql in doris. */
+    public boolean execute(String ddl, String database)
+            throws IOException, IllegalArgumentException {
+        if (StringUtils.isEmpty(ddl)) {
+            return false;
+        }
+        LOG.info("Execute SQL: {}", ddl);
+        HttpPost httpPost = buildHttpPost(ddl, database);
+        String responseEntity = "";
+        Map<String, Object> responseMap = handleResponse(httpPost, 
responseEntity);
+        return handleSchemaChange(responseMap, responseEntity);
+    }
+
+    public HttpPost buildHttpPost(String ddl, String database)
+            throws IllegalArgumentException, IOException {
+        Map<String, String> param = new HashMap<>();
+        param.put("stmt", ddl);
+        String requestUrl = String.format(SCHEMA_CHANGE_API, 
dorisOptions.getHttpUrl(), database);
+        HttpPost httpPost = new HttpPost(requestUrl);
+        httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+        httpPost.setEntity(new 
StringEntity(objectMapper.writeValueAsString(param)));
+        return httpPost;
+    }
+
+    private Map<String, Object> handleResponse(HttpUriRequest request, String 
responseEntity) {
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            CloseableHttpResponse response = httpclient.execute(request);
+            final int statusCode = response.getStatusLine().getStatusCode();
+            final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
+            if (statusCode == 200 && response.getEntity() != null) {
+                responseEntity = EntityUtils.toString(response.getEntity());
+                return objectMapper.readValue(responseEntity, Map.class);
+            } else {
+                throw new SchemaChangeException(
+                        "Failed to schemaChange, status: "
+                                + statusCode
+                                + ", reason: "
+                                + reasonPhrase);
+            }
+        } catch (Exception e) {
+            LOG.error("SchemaChange request error,", e);
+            throw new SchemaChangeException("SchemaChange request error with " 
+ e.getMessage());
+        }
+    }
+
+    private String authHeader() {
+        return "Basic "
+                + new String(
+                        Base64.encodeBase64(
+                                (dorisOptions.getUser() + ":" + 
dorisOptions.getPassword())
+                                        .getBytes(StandardCharsets.UTF_8)));
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index f1970bd..3cff4fa 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -75,14 +75,14 @@ public class TestRecordService {
         String noDeleteValue =
                 
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
 [...]
         String expectedNoDeleteValue =
-                "{\"id\":8,\"name\":\"Jfohn 
Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"'10:30:00'\",\"text_column\":\"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_D
 [...]
+                "{\"id\":8,\"name\":\"Jfohn 
Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"10:30:00\",\"text_column\":\"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_DEL
 [...]
         buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
 
         // delete value
         String deleteValue =
                 
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
 [...]
         String expectedDeleteValue =
-                "{\"id\":8,\"name\":\"Jfohn 
Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"'10:30:00'\",\"text_column\":\"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_D
 [...]
+                "{\"id\":8,\"name\":\"Jfohn 
Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"10:30:00\",\"text_column\":\"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__DORIS_DEL
 [...]
         buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
     }
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
new file mode 100644
index 0000000..c95af44
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
+import org.apache.doris.kafka.connector.model.doris.Schema;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange;
+import org.apache.doris.kafka.connector.writer.schema.SchemaChangeManager;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class TestDebeziumSchemaChange {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private final JsonConverter jsonConverter = new JsonConverter();
+    private final HashSet<String> sinkTableSet = new HashSet<>();
+    private DebeziumSchemaChange debeziumSchemaChange;
+    private DorisOptions dorisOptions;
+    private String topic;
+    private MockedStatic<RestService> mockRestService;
+
+    @Before
+    public void init() throws IOException {
+        InputStream stream =
+                this.getClass()
+                        .getClassLoader()
+                        
.getResourceAsStream("doris-connector-sink.properties");
+        Properties props = new Properties();
+        props.load(stream);
+        props.put("task_id", "1");
+        props.put("name", "sink-connector-test");
+        topic = "normal";
+        dorisOptions = new DorisOptions((Map) props);
+        DorisConnectMonitor dorisConnectMonitor = 
mock(DorisConnectMonitor.class);
+        DorisSystemService mockDorisSystemService = 
mock(DorisSystemService.class);
+        jsonConverter.configure(new HashMap<>(), false);
+        mockRestService = mockStatic(RestService.class);
+        SchemaChangeManager mockSchemaChangeManager = 
Mockito.mock(SchemaChangeManager.class);
+        Mockito.when(
+                        mockSchemaChangeManager.checkSchemaChange(
+                                Mockito.any(), Mockito.any(), Mockito.any()))
+                .thenReturn(true);
+        debeziumSchemaChange =
+                new DebeziumSchemaChange(
+                        topic,
+                        0,
+                        dorisOptions,
+                        new JdbcConnectionProvider(dorisOptions),
+                        dorisConnectMonitor);
+        when(mockDorisSystemService.tableExists(anyString(), 
anyString())).thenReturn(true);
+
+        sinkTableSet.add("normal_time");
+        debeziumSchemaChange.setSchemaChangeManager(mockSchemaChangeManager);
+        debeziumSchemaChange.setSinkTableSet(sinkTableSet);
+        debeziumSchemaChange.setDorisSystemService(mockDorisSystemService);
+    }
+
+    @Test
+    public void testAlterSchemaChange() {
+        String alterTopicMsg =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},
 [...]
+        SchemaAndValue schemaAndValue =
+                jsonConverter.toConnectData(topic, 
alterTopicMsg.getBytes(StandardCharsets.UTF_8));
+        SinkRecord sinkRecord =
+                TestRecordBuffer.newSinkRecord(schemaAndValue.value(), 8, 
schemaAndValue.schema());
+        String normalTimeSchemaStr =
+                
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"timestamp_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"datetime_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"date_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV2\"}],\"status\":200}";
+        Schema normalTimeSchema = null;
+        try {
+            normalTimeSchema = objectMapper.readValue(normalTimeSchemaStr, 
Schema.class);
+        } catch (JsonProcessingException e) {
+            throw new DorisException(e);
+        }
+        mockRestService
+                .when(() -> RestService.getSchema(any(), any(), any(), any()))
+                .thenReturn(normalTimeSchema);
+
+        debeziumSchemaChange.insert(sinkRecord);
+        List<String> ddlSqlList = debeziumSchemaChange.getDdlSqlList();
+        Assert.assertEquals(
+                ddlSqlList.get(0),
+                "ALTER TABLE `test_db`.`normal_time` ADD COLUMN `time_test` 
STRING DEFAULT '12:00'");
+    }
+
+    @After
+    public void close() {
+        mockRestService.close();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to