This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3589528 [Feature]Support kafka type converter (#12)
3589528 is described below
commit 3589528b5052f159aaa27b9efaf55fc28884bd87
Author: wudongliang <[email protected]>
AuthorDate: Mon Apr 15 10:06:56 2024 +0800
[Feature]Support kafka type converter (#12)
---
pom.xml | 6 +
.../doris/kafka/connector/cfg/DorisOptions.java | 9 +
.../connector/cfg/DorisSinkConnectorConfig.java | 2 +
.../connector/converter/RecordDescriptor.java | 225 +++++++++++++++++++++
.../kafka/connector/converter/RecordService.java | 107 +++++++---
.../connector/converter/RecordTypeRegister.java | 113 +++++++++++
.../connector/converter/type/AbstractDateType.java | 22 ++
.../converter/type/AbstractTemporalType.java | 51 +++++
.../connector/converter/type/AbstractTimeType.java | 22 ++
.../converter/type/AbstractTimestampType.java | 22 ++
.../connector/converter/type/AbstractType.java | 43 ++++
.../doris/kafka/connector/converter/type/Type.java | 46 +++++
.../type/connect/AbstractConnectMapType.java | 45 +++++
.../type/connect/AbstractConnectSchemaType.java | 32 +++
.../converter/type/connect/ConnectBooleanType.java | 29 +++
.../converter/type/connect/ConnectBytesType.java | 60 ++++++
.../converter/type/connect/ConnectDateType.java | 48 +++++
.../converter/type/connect/ConnectDecimalType.java | 41 ++++
.../converter/type/connect/ConnectFloat32Type.java | 34 ++++
.../converter/type/connect/ConnectFloat64Type.java | 34 ++++
.../converter/type/connect/ConnectInt16Type.java | 34 ++++
.../converter/type/connect/ConnectInt32Type.java | 34 ++++
.../converter/type/connect/ConnectInt64Type.java | 34 ++++
.../converter/type/connect/ConnectInt8Type.java | 34 ++++
.../connect/ConnectMapToConnectStringType.java | 35 ++++
.../converter/type/connect/ConnectStringType.java | 33 +++
.../converter/type/connect/ConnectTimeType.java | 56 +++++
.../type/connect/ConnectTimestampType.java | 49 +++++
.../type/debezium/AbstractDebeziumTimeType.java | 44 ++++
.../debezium/AbstractDebeziumTimestampType.java | 42 ++++
.../converter/type/debezium/DateType.java | 49 +++++
.../converter/type/debezium/MicroTimeType.java | 38 ++++
.../type/debezium/MicroTimestampType.java | 38 ++++
.../converter/type/debezium/NanoTimeType.java | 38 ++++
.../converter/type/debezium/NanoTimestampType.java | 43 ++++
.../converter/type/debezium/TimeType.java | 38 ++++
.../converter/type/debezium/TimestampType.java | 39 ++++
.../type/debezium/VariableScaleDecimalType.java | 58 ++++++
.../converter/type/debezium/ZonedTimeType.java | 55 +++++
.../type/debezium/ZonedTimestampType.java | 52 +++++
.../connector/converter/utils/DateTimeUtils.java | 115 +++++++++++
.../connector/converter/TestRecordService.java | 85 ++++----
42 files changed, 1973 insertions(+), 61 deletions(-)
diff --git a/pom.xml b/pom.xml
index 6f8dbab..2ceea3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -250,6 +250,12 @@
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/io.debezium/debezium-core -->
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ <version>1.9.8.Final</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index c47ea1f..6183869 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -58,6 +58,7 @@ public class DorisOptions {
private final Properties streamLoadProp = new Properties();
private String labelPrefix;
+ private String databaseTimeZone;
private LoadModel loadModel;
private DeliveryGuarantee deliveryGuarantee;
private ConverterMode converterMode;
@@ -71,6 +72,10 @@ public class DorisOptions {
this.password = config.get(DorisSinkConnectorConfig.DORIS_PASSWORD);
this.database = config.get(DorisSinkConnectorConfig.DORIS_DATABASE);
this.taskId = Integer.parseInt(config.get(ConfigCheckUtils.TASK_ID));
+ this.databaseTimeZone =
DorisSinkConnectorConfig.DATABASE_TIME_ZONE_DEFAULT;
+ if (config.containsKey(DorisSinkConnectorConfig.DATABASE_TIME_ZONE)) {
+ this.databaseTimeZone =
config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE);
+ }
this.loadModel =
LoadModel.of(
config.getOrDefault(
@@ -275,6 +280,10 @@ public class DorisOptions {
return autoRedirect;
}
+ public String getDatabaseTimeZone() {
+ return databaseTimeZone;
+ }
+
public boolean isEnableDelete() {
return enableDelete;
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 5ca55ef..58b6a62 100644
---
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -69,6 +69,8 @@ public class DorisSinkConnectorConfig {
public static final String REQUEST_CONNECT_TIMEOUT_MS =
"request.connect.timeout.ms";
public static final Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 *
1000;
public static final Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30
* 1000;
+ public static final String DATABASE_TIME_ZONE = "database.time_zone";
+ public static final String DATABASE_TIME_ZONE_DEFAULT = "UTC";
public static final String LOAD_MODEL = "load.model";
public static final String LOAD_MODEL_DEFAULT =
LoadModel.STREAM_LOAD.name();
public static final String AUTO_REDIRECT = "auto.redirect";
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
new file mode 100644
index 0000000..66a7c21
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class RecordDescriptor {
+ private final SinkRecord record;
+ private final String topicName;
+ private final List<String> keyFieldNames;
+ private final List<String> nonKeyFieldNames;
+ private final Map<String, FieldDescriptor> fields;
+ private final boolean flattened;
+
+ private RecordDescriptor(
+ SinkRecord record,
+ String topicName,
+ List<String> keyFieldNames,
+ List<String> nonKeyFieldNames,
+ Map<String, FieldDescriptor> fields,
+ boolean flattened) {
+ this.record = record;
+ this.topicName = topicName;
+ this.keyFieldNames = keyFieldNames;
+ this.nonKeyFieldNames = nonKeyFieldNames;
+ this.fields = fields;
+ this.flattened = flattened;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public Integer getPartition() {
+ return record.kafkaPartition();
+ }
+
+ public long getOffset() {
+ return record.kafkaOffset();
+ }
+
+ public List<String> getKeyFieldNames() {
+ return keyFieldNames;
+ }
+
+ public List<String> getNonKeyFieldNames() {
+ return nonKeyFieldNames;
+ }
+
+ public Map<String, FieldDescriptor> getFields() {
+ return fields;
+ }
+
+ public boolean isDebeziumSinkRecord() {
+ return !flattened;
+ }
+
+ public boolean isTombstone() {
+ // NOTE
+ // Debezium TOMBSTONE has both value and valueSchema to null, instead
the
+ // ExtractNewRecordState SMT with delete.handling.mode=none will
generate
+ // a record only with value null that by JDBC connector is treated as
a flattened delete.
+ // See isDelete method.
+ return record.value() == null && record.valueSchema() == null;
+ }
+
+ public boolean isDelete() {
+ if (!isDebeziumSinkRecord()) {
+ return record.value() == null;
+ } else if (record.value() != null) {
+ final Struct value = (Struct) record.value();
+ return "d".equals(value.getString("op"));
+ }
+ return false;
+ }
+
+ public Struct getAfterStruct() {
+ if (isDebeziumSinkRecord()) {
+ return ((Struct) record.value()).getStruct("after");
+ } else {
+ return ((Struct) record.value());
+ }
+ }
+
+ public Struct getBeforeStruct() {
+ if (isDebeziumSinkRecord()) {
+ return ((Struct) record.value()).getStruct("before");
+ } else {
+ return ((Struct) record.value());
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class FieldDescriptor {
+ private final Schema schema;
+ private final String name;
+ private final String schemaTypeName;
+ private final String schemaName;
+
+ public FieldDescriptor(
+ Schema schema, String name, String schemaTypeName, String
schemaName) {
+ this.schema = schema;
+ this.name = name;
+ this.schemaTypeName = schemaTypeName;
+ this.schemaName = schemaName;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public String getSchemaTypeName() {
+ return schemaTypeName;
+ }
+ }
+
+ public static class Builder {
+
+ private SinkRecord sinkRecord;
+
+ // Internal build state
+ private final List<String> keyFieldNames = new ArrayList<>();
+ private final List<String> nonKeyFieldNames = new ArrayList<>();
+ private final Map<String, FieldDescriptor> allFields = new
LinkedHashMap<>();
+
+ public Builder withSinkRecord(SinkRecord record) {
+ this.sinkRecord = record;
+ return this;
+ }
+
+ public RecordDescriptor build() {
+ Objects.requireNonNull(sinkRecord, "The sink record must be
provided.");
+
+ final boolean flattened = !isTombstone(sinkRecord) &&
isFlattened(sinkRecord);
+ readSinkRecordNonKeyData(sinkRecord, flattened);
+
+ return new RecordDescriptor(
+ sinkRecord,
+ sinkRecord.topic(),
+ keyFieldNames,
+ nonKeyFieldNames,
+ allFields,
+ flattened);
+ }
+
+ private boolean isFlattened(SinkRecord record) {
+ return record.valueSchema().name() == null
+ || !record.valueSchema().name().contains("Envelope");
+ }
+
+ private boolean isTombstone(SinkRecord record) {
+
+ return record.value() == null && record.valueSchema() == null;
+ }
+
+ private void readSinkRecordNonKeyData(SinkRecord record, boolean
flattened) {
+ final Schema valueSchema = record.valueSchema();
+ if (valueSchema != null) {
+ if (flattened) {
+ // In a flattened event type, it's safe to read the field
names directly
+ // from the schema as this isn't a complex Debezium
message type.
+ applyNonKeyFields(valueSchema);
+ } else {
+ final Field after = valueSchema.field("after");
+ if (after == null) {
+ throw new ConnectException(
+ "Received an unexpected message type that does
not have an 'after' Debezium block");
+ }
+ applyNonKeyFields(after.schema());
+ }
+ }
+ }
+
+ private void applyNonKeyFields(Schema schema) {
+ for (Field field : schema.fields()) {
+ if (!keyFieldNames.contains(field.name())) {
+ applyNonKeyField(field.name(), field.schema());
+ }
+ }
+ }
+
+ private void applyNonKeyField(String name, Schema schema) {
+ FieldDescriptor fieldDescriptor =
+ new FieldDescriptor(schema, name, schema.type().name(),
schema.name());
+ nonKeyFieldNames.add(fieldDescriptor.getName());
+ allFields.put(fieldDescriptor.getName(), fieldDescriptor);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index bd55297..1390761 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -20,19 +20,22 @@
package org.apache.doris.kafka.connector.converter;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.StringJoiner;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.type.Type;
import org.apache.doris.kafka.connector.exception.DataFormatException;
import org.apache.doris.kafka.connector.writer.LoadConstants;
import org.apache.doris.kafka.connector.writer.RecordBuffer;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
@@ -43,6 +46,7 @@ public class RecordService {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final JsonConverter converter;
private DorisOptions dorisOptions;
+ private RecordTypeRegister recordTypeRegister;
public RecordService() {
this.converter = new JsonConverter();
@@ -54,6 +58,7 @@ public class RecordService {
public RecordService(DorisOptions dorisOptions) {
this();
this.dorisOptions = dorisOptions;
+ this.recordTypeRegister = new RecordTypeRegister(dorisOptions);
}
/**
@@ -61,27 +66,34 @@ public class RecordService {
* "optional": false, "name": "" }, "payload": { "name": "doris",
"__deleted": "true" } }
*/
public String processStructRecord(SinkRecord record) {
- byte[] bytes =
- converter.fromConnectData(record.topic(),
record.valueSchema(), record.value());
- String recordValue = new String(bytes, StandardCharsets.UTF_8);
+ String processedRecord;
if (ConverterMode.DEBEZIUM_INGESTION ==
dorisOptions.getConverterMode()) {
- try {
- Map<String, Object> recordMap =
- MAPPER.readValue(recordValue, new
TypeReference<Map<String, Object>>() {});
- // delete sign sync
- if ("d".equals(recordMap.get("op"))) {
- Map<String, Object> beforeValue = (Map<String, Object>)
recordMap.get("before");
- beforeValue.put(LoadConstants.DORIS_DELETE_SIGN,
LoadConstants.DORIS_DEL_TRUE);
- return MAPPER.writeValueAsString(beforeValue);
- }
- Map<String, Object> afterValue = (Map<String, Object>)
recordMap.get("after");
- afterValue.put(LoadConstants.DORIS_DELETE_SIGN,
LoadConstants.DORIS_DEL_FALSE);
- return MAPPER.writeValueAsString(afterValue);
- } catch (JsonProcessingException e) {
- LOG.error("parse record failed, cause by parse json error:
{}", recordValue);
+ RecordDescriptor recordDescriptor = buildRecordDescriptor(record);
+ if (recordDescriptor.isTombstone()) {
+ return null;
}
+ List<String> nonKeyFieldNames =
recordDescriptor.getNonKeyFieldNames();
+ if (recordDescriptor.isDelete()) {
+ processedRecord =
+ parseFieldValues(
+ recordDescriptor,
+ recordDescriptor.getBeforeStruct(),
+ nonKeyFieldNames,
+ true);
+ } else {
+ processedRecord =
+ parseFieldValues(
+ recordDescriptor,
+ recordDescriptor.getAfterStruct(),
+ nonKeyFieldNames,
+ false);
+ }
+ } else {
+ byte[] bytes =
+ converter.fromConnectData(record.topic(),
record.valueSchema(), record.value());
+ processedRecord = new String(bytes, StandardCharsets.UTF_8);
}
- return recordValue;
+ return processedRecord;
}
/** process list record from kafka [{"name":"doris1"},{"name":"doris2"}] */
@@ -114,6 +126,43 @@ public class RecordService {
return record.value().toString();
}
+ private String parseFieldValues(
+ RecordDescriptor record, Struct source, List<String> fields,
boolean isDelete) {
+ Map<String, Object> filedMapping = new LinkedHashMap<>();
+ String filedResult = null;
+ final Map<String, Type> typeRegistry =
recordTypeRegister.getTypeRegistry();
+ for (String fieldName : fields) {
+ final RecordDescriptor.FieldDescriptor field =
record.getFields().get(fieldName);
+ String fieldSchemaName = field.getSchemaName();
+ String fieldSchemaTypeName = field.getSchemaTypeName();
+ Object value =
+ field.getSchema().isOptional()
+ ? source.getWithoutDefault(fieldName)
+ : source.get(fieldName);
+ Type type =
+ Objects.nonNull(fieldSchemaName)
+ ? typeRegistry.get(fieldSchemaName)
+ : typeRegistry.get(fieldSchemaTypeName);
+ Object convertValue = type.getValue(value);
+ if (Objects.nonNull(convertValue) && !type.isNumber()) {
+ filedMapping.put(fieldName, convertValue.toString());
+ } else {
+ filedMapping.put(fieldName, convertValue);
+ }
+ }
+ try {
+ if (isDelete) {
+ filedMapping.put(LoadConstants.DORIS_DELETE_SIGN,
LoadConstants.DORIS_DEL_TRUE);
+ } else {
+ filedMapping.put(LoadConstants.DORIS_DELETE_SIGN,
LoadConstants.DORIS_DEL_FALSE);
+ }
+ filedResult = MAPPER.writeValueAsString(filedMapping);
+ } catch (JsonProcessingException e) {
+ LOG.error("parse record failed, cause by parse json error: {}",
filedMapping);
+ }
+ return filedResult;
+ }
+
/**
* Given a single Record from put API, process it and convert it into a
Json String.
*
@@ -121,14 +170,26 @@ public class RecordService {
* @return Json String
*/
public String getProcessedRecord(SinkRecord record) {
+ String processedRecord;
if (record.value() instanceof Struct) {
- return processStructRecord(record);
+ processedRecord = processStructRecord(record);
} else if (record.value() instanceof List) {
- return processListRecord(record);
+ processedRecord = processListRecord(record);
} else if (record.value() instanceof Map) {
- return processMapRecord(record);
+ processedRecord = processMapRecord(record);
} else {
- return processStringRecord(record);
+ processedRecord = record.value().toString();
+ }
+ return processedRecord;
+ }
+
+ private RecordDescriptor buildRecordDescriptor(SinkRecord record) {
+ RecordDescriptor recordDescriptor;
+ try {
+ recordDescriptor =
RecordDescriptor.builder().withSinkRecord(record).build();
+ } catch (Exception e) {
+ throw new ConnectException("Failed to process a sink record", e);
}
+ return recordDescriptor;
}
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
new file mode 100644
index 0000000..e78909d
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.type.Type;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectBooleanType;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectBytesType;
+import org.apache.doris.kafka.connector.converter.type.connect.ConnectDateType;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectDecimalType;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectFloat32Type;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectFloat64Type;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectInt16Type;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectInt32Type;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectInt64Type;
+import org.apache.doris.kafka.connector.converter.type.connect.ConnectInt8Type;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectMapToConnectStringType;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectStringType;
+import org.apache.doris.kafka.connector.converter.type.connect.ConnectTimeType;
+import
org.apache.doris.kafka.connector.converter.type.connect.ConnectTimestampType;
+import org.apache.doris.kafka.connector.converter.type.debezium.DateType;
+import org.apache.doris.kafka.connector.converter.type.debezium.MicroTimeType;
+import
org.apache.doris.kafka.connector.converter.type.debezium.MicroTimestampType;
+import org.apache.doris.kafka.connector.converter.type.debezium.NanoTimeType;
+import
org.apache.doris.kafka.connector.converter.type.debezium.NanoTimestampType;
+import org.apache.doris.kafka.connector.converter.type.debezium.TimeType;
+import org.apache.doris.kafka.connector.converter.type.debezium.TimestampType;
+import
org.apache.doris.kafka.connector.converter.type.debezium.VariableScaleDecimalType;
+import org.apache.doris.kafka.connector.converter.type.debezium.ZonedTimeType;
+import
org.apache.doris.kafka.connector.converter.type.debezium.ZonedTimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecordTypeRegister {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RecordTypeRegister.class);
+
+ private final Map<String, Type> typeRegistry = new HashMap<>();
+ private final DorisOptions dorisOptions;
+
+ public RecordTypeRegister(DorisOptions dorisOptions) {
+ this.dorisOptions = dorisOptions;
+ registerTypes();
+ }
+
+ protected void registerTypes() {
+ // Supported common Debezium data types
+ registerType(DateType.INSTANCE);
+ registerType(TimeType.INSTANCE);
+ registerType(MicroTimeType.INSTANCE);
+ registerType(TimestampType.INSTANCE);
+ registerType(MicroTimestampType.INSTANCE);
+ registerType(NanoTimeType.INSTANCE);
+ registerType(NanoTimestampType.INSTANCE);
+ registerType(ZonedTimeType.INSTANCE);
+ registerType(ZonedTimestampType.INSTANCE);
+ registerType(VariableScaleDecimalType.INSTANCE);
+
+ // Supported connect data types
+ registerType(ConnectBooleanType.INSTANCE);
+ registerType(ConnectBytesType.INSTANCE);
+ registerType(ConnectDateType.INSTANCE);
+ registerType(ConnectDecimalType.INSTANCE);
+ registerType(ConnectFloat32Type.INSTANCE);
+ registerType(ConnectFloat64Type.INSTANCE);
+ registerType(ConnectInt8Type.INSTANCE);
+ registerType(ConnectInt16Type.INSTANCE);
+ registerType(ConnectInt32Type.INSTANCE);
+ registerType(ConnectInt64Type.INSTANCE);
+ registerType(ConnectStringType.INSTANCE);
+ registerType(ConnectTimestampType.INSTANCE);
+ registerType(ConnectTimeType.INSTANCE);
+ registerType(ConnectMapToConnectStringType.INSTANCE);
+ }
+
+ protected void registerType(Type type) {
+ type.configure(dorisOptions);
+ for (String key : type.getRegistrationKeys()) {
+ final Type existing = typeRegistry.put(key, type);
+ if (existing != null) {
+ LOG.debug(
+ "Type replaced [{}]: {} -> {}",
+ key,
+ existing.getClass().getName(),
+ type.getClass().getName());
+ } else {
+ LOG.debug("Type registered [{}]: {}", key,
type.getClass().getName());
+ }
+ }
+ }
+
+ public Map<String, Type> getTypeRegistry() {
+ return typeRegistry;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
new file mode 100644
index 0000000..de32cd6
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+/** An abstract base class for all temporal date implementations of {@link
Type}. */
+public abstract class AbstractDateType extends AbstractTemporalType {}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java
new file mode 100644
index 0000000..4b50a79
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+import java.time.ZoneId;
+import java.util.TimeZone;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An abstract base class for all temporal implementations of {@link Type}. */
+public abstract class AbstractTemporalType extends AbstractType {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractTemporalType.class);
+
+ private TimeZone databaseTimeZone;
+
+ @Override
+ public void configure(DorisOptions dorisOptions) {
+ final String databaseTimeZone = dorisOptions.getDatabaseTimeZone();
+ try {
+ this.databaseTimeZone =
TimeZone.getTimeZone(ZoneId.of(databaseTimeZone));
+ } catch (Exception e) {
+ LOGGER.error(
+ "Failed to resolve time zone '{}', please specify a
correct time zone value",
+ databaseTimeZone,
+ e);
+ throw e;
+ }
+ }
+
+ protected TimeZone getDatabaseTimeZone() {
+ return databaseTimeZone;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
new file mode 100644
index 0000000..533f1e1
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+/** An abstract temporal implementation of {@link Type} for {@code TIME} based
columns. */
+public abstract class AbstractTimeType extends AbstractTemporalType {}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
new file mode 100644
index 0000000..3d50376
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+/** An abstract temporal implementation of {@link Type} for {@code TIMESTAMP}
based columns. */
+public abstract class AbstractTimestampType extends AbstractTemporalType {}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
new file mode 100644
index 0000000..d915a89
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+
+/** An abstract implementation of {@link Type}, which all types should extend.
*/
+public abstract class AbstractType implements Type {
+
+ @Override
+ public void configure(DorisOptions dorisOptions) {}
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ return sourceValue;
+ }
+
+ @Override
+ public boolean isNumber() {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
new file mode 100644
index 0000000..c284f0e
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+
+/**
+ * A type indicates the type of each column of kafka record, including various
column types of
+ * debezium and connect.
+ */
+public interface Type {
+
+ /** Allows a type to perform initialization/configuration tasks based on
user configs. */
+ void configure(DorisOptions dorisOptions);
+
+ /**
+ * Returns the names that this type will be mapped as.
+ *
+ * <p>For example, when creating a custom mapping for {@code
io.debezium.data.Bits}, a type
+ * could be registered using the {@code LOGICAL_NAME} of the schema if the
type is to be used
+ * when a schema name is identified; otherwise it could be registered as
the raw column type
+ * when column type propagation is enabled.
+ */
+ String[] getRegistrationKeys();
+
+ /** Get the actual converted value based on the column type. */
+ Object getValue(Object sourceValue);
+
+ boolean isNumber();
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java
new file mode 100644
index 0000000..53c64d1
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * An implementation of {@link
org.apache.doris.kafka.connector.converter.type.Type} for {@code MAP}
+ * schema types.
+ */
+public abstract class AbstractConnectMapType extends AbstractConnectSchemaType
{
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"MAP"};
+ }
+
+ protected String mapToJsonString(Object value) {
+ try {
+ return MAPPER.writeValueAsString(value);
+ } catch (JsonProcessingException e) {
+ throw new ConnectException("Failed to deserialize MAP data to
JSON", e);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java
new file mode 100644
index 0000000..d544f75
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import org.apache.doris.kafka.connector.converter.type.AbstractType;
+
+/**
+ * An abstract implementation of {@link
org.apache.doris.kafka.connector.converter.type.Type} that
+ * all Kafka Connect based schema types should be derived.
+ *
+ * <p>This abstract implementation is used as a marker object to designate
types that are operating
+ * on the raw {@link org.apache.kafka.connect.data.Schema.Type} values rather
than custom schema
+ * types that are contributed by Kafka Connect, such as Date or Time, or other
third parties such as
+ * Debezium.
+ */
+public abstract class AbstractConnectSchemaType extends AbstractType {}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
new file mode 100644
index 0000000..18c5af3
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectBooleanType extends AbstractConnectSchemaType {
+
+ public static final ConnectBooleanType INSTANCE = new ConnectBooleanType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"BOOLEAN"};
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
new file mode 100644
index 0000000..6c2701c
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import java.nio.ByteBuffer;
+
+public class ConnectBytesType extends AbstractConnectSchemaType {
+
+ public static final ConnectBytesType INSTANCE = new ConnectBytesType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"BYTES"};
+ }
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ return bytesToHexString(getByteArrayFromValue(sourceValue));
+ }
+
+ private byte[] getByteArrayFromValue(Object value) {
+ byte[] byteArray = null;
+ if (value instanceof ByteBuffer) {
+ final ByteBuffer buffer = ((ByteBuffer) value).slice();
+ byteArray = new byte[buffer.remaining()];
+ buffer.get(byteArray);
+ } else if (value instanceof byte[]) {
+ byteArray = (byte[]) value;
+ }
+ return byteArray;
+ }
+
+ /** Convert hexadecimal byte array to string */
+ private String bytesToHexString(byte[] bytes) {
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes) {
+ sb.append(String.format("%02X", b));
+ }
+ return sb.toString();
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
new file mode 100644
index 0000000..4106f8d
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import org.apache.doris.kafka.connector.converter.type.AbstractDateType;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ConnectDateType extends AbstractDateType {
+
+ public static final ConnectDateType INSTANCE = new ConnectDateType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {Date.LOGICAL_NAME};
+ }
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof java.util.Date) {
+ return DateTimeUtils.toLocalDateFromDate((java.util.Date)
sourceValue);
+ }
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
new file mode 100644
index 0000000..8625883
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.kafka.connect.data.Decimal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectDecimalType extends AbstractType {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConnectDecimalType.class);
+
+ public static final ConnectDecimalType INSTANCE = new ConnectDecimalType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {Decimal.LOGICAL_NAME};
+ }
+
+ @Override
+ public boolean isNumber() {
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
new file mode 100644
index 0000000..98b6936
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectFloat32Type extends AbstractConnectSchemaType {
+
+ public static final ConnectFloat32Type INSTANCE = new ConnectFloat32Type();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"FLOAT32"};
+ }
+
+ @Override
+ public boolean isNumber() {
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
new file mode 100644
index 0000000..f050c15
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectFloat64Type extends AbstractConnectSchemaType {
+
+ public static final ConnectFloat64Type INSTANCE = new ConnectFloat64Type();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"FLOAT64"};
+ }
+
+ @Override
+ public boolean isNumber() {
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
new file mode 100644
index 0000000..573813b
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectInt16Type extends AbstractConnectSchemaType {
+
+ public static final ConnectInt16Type INSTANCE = new ConnectInt16Type();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"INT16"};
+ }
+
+ @Override
+ public boolean isNumber() {
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
new file mode 100644
index 0000000..50dd6c7
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectInt32Type extends AbstractConnectSchemaType {
+
+ public static final ConnectInt32Type INSTANCE = new ConnectInt32Type();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"INT32"};
+ }
+
+ @Override
+ public boolean isNumber() {
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
new file mode 100644
index 0000000..c08abb6
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectInt64Type extends AbstractConnectSchemaType {
+
+ public static final ConnectInt64Type INSTANCE = new ConnectInt64Type();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"INT64"};
+ }
+
+ @Override
+ public boolean isNumber() {
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
new file mode 100644
index 0000000..55c82cf
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectInt8Type extends AbstractConnectSchemaType {
+
+ public static final ConnectInt8Type INSTANCE = new ConnectInt8Type();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"INT8"};
+ }
+
+ @Override
+ public boolean isNumber() {
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
new file mode 100644
index 0000000..cac2624
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import java.util.Map;
+
+public class ConnectMapToConnectStringType extends AbstractConnectMapType {
+
+ public static final ConnectMapToConnectStringType INSTANCE =
+ new ConnectMapToConnectStringType();
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue instanceof Map) {
+ sourceValue = mapToJsonString(sourceValue);
+ }
+ return ConnectStringType.INSTANCE.getValue(sourceValue);
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
new file mode 100644
index 0000000..0353020
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+/**
+ * An implementation of {@link
org.apache.doris.kafka.connector.converter.type.Type} that supports
+ * {@code STRING} connect schema types.
+ */
+public class ConnectStringType extends AbstractConnectSchemaType {
+
+ public static final ConnectStringType INSTANCE = new ConnectStringType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {"STRING"};
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
new file mode 100644
index 0000000..de3be44
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Date;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimeType;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ConnectTimeType extends AbstractTimeType {
+
+ public static final ConnectTimeType INSTANCE = new ConnectTimeType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {Time.LOGICAL_NAME};
+ }
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof Date) {
+
+ final LocalTime localTime =
DateTimeUtils.toLocalTimeFromUtcDate((Date) sourceValue);
+ final LocalDateTime localDateTime =
localTime.atDate(LocalDate.now());
+ return localDateTime.toLocalTime();
+ }
+
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
new file mode 100644
index 0000000..8af71b9
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ConnectTimestampType extends AbstractTimestampType {
+
+ public static final ConnectTimestampType INSTANCE = new
ConnectTimestampType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {Timestamp.LOGICAL_NAME};
+ }
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof java.util.Date) {
+ return DateTimeUtils.toLocalDateTimeFromDate((java.util.Date)
sourceValue);
+ }
+
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
new file mode 100644
index 0000000..c0140b8
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import java.time.LocalDate;
+import java.time.LocalTime;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimeType;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public abstract class AbstractDebeziumTimeType extends AbstractTimeType {
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof Number) {
+ final LocalTime localTime = getLocalTime((Number) sourceValue);
+ return localTime.atDate(LocalDate.now());
+ }
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+
+ protected abstract LocalTime getLocalTime(Number value);
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java
new file mode 100644
index 0000000..0d17dd1
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import java.time.LocalDateTime;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public abstract class AbstractDebeziumTimestampType extends
AbstractTimestampType {
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof Number) {
+ return getLocalDateTime(((Number) sourceValue).longValue());
+ }
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+
+ protected abstract LocalDateTime getLocalDateTime(long value);
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
new file mode 100644
index 0000000..912f0a4
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.Date;
+import org.apache.doris.kafka.connector.converter.type.AbstractDateType;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class DateType extends AbstractDateType {
+
+ public static final DateType INSTANCE = new DateType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {Date.SCHEMA_NAME};
+ }
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof Number) {
+ return DateTimeUtils.toLocalDateOfEpochDays(((Number)
sourceValue).longValue());
+ }
+
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
new file mode 100644
index 0000000..36eeceb
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.MicroTime;
+import java.time.LocalTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+public class MicroTimeType extends AbstractDebeziumTimeType {
+
+ public static final MicroTimeType INSTANCE = new MicroTimeType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {MicroTime.SCHEMA_NAME};
+ }
+
+ @Override
+ protected LocalTime getLocalTime(Number value) {
+ return
DateTimeUtils.toLocalTimeFromDurationMicroseconds(value.longValue());
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
new file mode 100644
index 0000000..b8c71a2
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.MicroTimestamp;
+import java.time.LocalDateTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+public class MicroTimestampType extends AbstractDebeziumTimestampType {
+
+ public static final MicroTimestampType INSTANCE = new MicroTimestampType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {MicroTimestamp.SCHEMA_NAME};
+ }
+
+ @Override
+ protected LocalDateTime getLocalDateTime(long value) {
+ return DateTimeUtils.toLocalDateTimeFromInstantEpochMicros(value);
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
new file mode 100644
index 0000000..9519e64
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.NanoTime;
+import java.time.LocalTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+public class NanoTimeType extends AbstractDebeziumTimeType {
+
+ public static final NanoTimeType INSTANCE = new NanoTimeType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {NanoTime.SCHEMA_NAME};
+ }
+
+ @Override
+ protected LocalTime getLocalTime(Number value) {
+ return
DateTimeUtils.toLocalTimeFromDurationNanoseconds(value.longValue());
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
new file mode 100644
index 0000000..eec06c8
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTimestamp;
+import java.time.LocalDateTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+/**
+ * An implementation of {@link
org.apache.doris.kafka.connector.converter.type.Type} for {@link
+ * MicroTimestamp} values.
+ */
+public class NanoTimestampType extends AbstractDebeziumTimestampType {
+
+ public static final NanoTimestampType INSTANCE = new NanoTimestampType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {NanoTimestamp.SCHEMA_NAME};
+ }
+
+ @Override
+ protected LocalDateTime getLocalDateTime(long value) {
+ return DateTimeUtils.toLocalDateTimeFromInstantEpochNanos(value);
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
new file mode 100644
index 0000000..83e95d9
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.Time;
+import java.time.LocalTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+public class TimeType extends AbstractDebeziumTimeType {
+
+ public static final TimeType INSTANCE = new TimeType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {Time.SCHEMA_NAME};
+ }
+
+ @Override
+ protected LocalTime getLocalTime(Number value) {
+ return
DateTimeUtils.toLocalTimeFromDurationMilliseconds(value.longValue());
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java
new file mode 100644
index 0000000..e5547b1
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+public class TimestampType extends AbstractDebeziumTimestampType {
+
+ public static final TimestampType INSTANCE = new TimestampType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {Timestamp.SCHEMA_NAME};
+ }
+
+ @Override
+ protected LocalDateTime getLocalDateTime(long value) {
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(value),
ZoneOffset.UTC);
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
new file mode 100644
index 0000000..e38fe41
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.data.VariableScaleDecimal;
+import java.math.BigDecimal;
+import java.util.Optional;
+import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class VariableScaleDecimalType extends AbstractType {
+
+ public static final VariableScaleDecimalType INSTANCE = new
VariableScaleDecimalType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {VariableScaleDecimal.LOGICAL_NAME};
+ }
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof Struct) {
+ Optional<BigDecimal> bigDecimalValue =
+ VariableScaleDecimal.toLogical((Struct)
sourceValue).getDecimalValue();
+ return bigDecimalValue.get();
+ }
+
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+
+ @Override
+ public boolean isNumber() {
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java
new file mode 100644
index 0000000..c3528d9
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.ZonedTime;
+import java.time.LocalDate;
+import java.time.OffsetTime;
+import java.time.ZonedDateTime;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimeType;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ZonedTimeType extends AbstractTimeType {
+
+ public static final ZonedTimeType INSTANCE = new ZonedTimeType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {ZonedTime.SCHEMA_NAME};
+ }
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof String) {
+ final ZonedDateTime zdt =
+ OffsetTime.parse((String) sourceValue, ZonedTime.FORMATTER)
+ .atDate(LocalDate.now())
+ .toZonedDateTime();
+ return zdt.toOffsetDateTime();
+ }
+
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java
new file mode 100644
index 0000000..b03a400
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.ZonedTimestamp;
+import java.time.ZonedDateTime;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ZonedTimestampType extends AbstractTimestampType {
+
+ public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
+
+ @Override
+ public String[] getRegistrationKeys() {
+ return new String[] {ZonedTimestamp.SCHEMA_NAME};
+ }
+
+ @Override
+ public Object getValue(Object sourceValue) {
+ if (sourceValue == null) {
+ return null;
+ }
+ if (sourceValue instanceof String) {
+ final ZonedDateTime zdt =
+ ZonedDateTime.parse((String) sourceValue,
ZonedTimestamp.FORMATTER)
+
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
+ return zdt.toOffsetDateTime();
+ }
+
+ throw new ConnectException(
+ String.format(
+ "Unexpected %s value '%s' with type '%s'",
+ getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
b/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
new file mode 100644
index 0000000..09ee9d0
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.utils;
+
+import io.debezium.time.Conversions;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+public class DateTimeUtils {
+
+ private DateTimeUtils() {}
+
+ public static Instant toInstantFromNanos(long epochNanos) {
+ final long epochSeconds = TimeUnit.NANOSECONDS.toSeconds(epochNanos);
+ final long adjustment =
+ TimeUnit.NANOSECONDS.toNanos(epochNanos %
TimeUnit.SECONDS.toNanos(1));
+ return Instant.ofEpochSecond(epochSeconds, adjustment);
+ }
+
+ public static ZonedDateTime toZonedDateTimeFromDate(Date date, TimeZone
timeZone) {
+ return toZonedDateTimeFromDate(date, timeZone.toZoneId());
+ }
+
+ public static ZonedDateTime toZonedDateTimeFromDate(Date date, ZoneId
zoneId) {
+ return date.toInstant().atZone(zoneId);
+ }
+
+ public static ZonedDateTime toZonedDateTimeFromInstantEpochMicros(long
epochMicros) {
+ return
Conversions.toInstantFromMicros(epochMicros).atZone(ZoneOffset.UTC);
+ }
+
+ public static ZonedDateTime toZonedDateTimeFromInstantEpochNanos(long
epochNanos) {
+ return ZonedDateTime.ofInstant(toInstantFromNanos(epochNanos),
ZoneOffset.UTC);
+ }
+
+ public static LocalDate toLocalDateOfEpochDays(long epochDays) {
+ return LocalDate.ofEpochDay(epochDays);
+ }
+
+ public static LocalDate toLocalDateFromDate(Date date) {
+ return toLocalDateFromInstantEpochMillis(date.getTime());
+ }
+
+ public static LocalDate toLocalDateFromInstantEpochMillis(long
epochMillis) {
+ return LocalDate.ofEpochDay(Duration.ofMillis(epochMillis).toDays());
+ }
+
+ public static LocalTime toLocalTimeFromDurationMilliseconds(long
durationMillis) {
+ return LocalTime.ofNanoOfDay(Duration.of(durationMillis,
ChronoUnit.MILLIS).toNanos());
+ }
+
+ public static LocalTime toLocalTimeFromDurationMicroseconds(long
durationMicros) {
+ return LocalTime.ofNanoOfDay(Duration.of(durationMicros,
ChronoUnit.MICROS).toNanos());
+ }
+
+ public static LocalTime toLocalTimeFromDurationNanoseconds(long
durationNanos) {
+ return LocalTime.ofNanoOfDay(Duration.of(durationNanos,
ChronoUnit.NANOS).toNanos());
+ }
+
+ public static LocalTime toLocalTimeFromUtcDate(Date date) {
+ return date.toInstant().atOffset(ZoneOffset.UTC).toLocalTime();
+ }
+
+ public static LocalDateTime toLocalDateTimeFromDate(Date date) {
+ return toLocalDateTimeFromInstantEpochMillis(date.getTime());
+ }
+
+ public static LocalDateTime toLocalDateTimeFromInstantEpochMillis(long
epochMillis) {
+ return LocalDateTime.ofInstant(
+ Conversions.toInstantFromMillis(epochMillis), ZoneOffset.UTC);
+ }
+
+ public static LocalDateTime toLocalDateTimeFromInstantEpochMicros(long
epochMicros) {
+ return LocalDateTime.ofInstant(
+ Conversions.toInstantFromMicros(epochMicros), ZoneOffset.UTC);
+ }
+
+ public static LocalDateTime toLocalDateTimeFromInstantEpochNanos(long
epochNanos) {
+ return LocalDateTime.ofInstant(toInstantFromNanos(epochNanos),
ZoneOffset.UTC);
+ }
+
+ public static Timestamp toTimestampFromMillis(long epochMilliseconds) {
+ final Instant instant =
Conversions.toInstantFromMillis(epochMilliseconds);
+ final Timestamp ts = new Timestamp(instant.toEpochMilli());
+ ts.setNanos(instant.getNano());
+ return ts;
+ }
+}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 0804980..498feb0 100644
---
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -59,6 +59,44 @@ public class TestRecordService {
jsonConverter.configure(config, false);
}
+ /**
+ * The mysql table schema is as follows.
+ *
+ * <p>CREATE TABLE example_table ( id INT AUTO_INCREMENT PRIMARY KEY, name
VARCHAR(50), age INT,
+ * email VARCHAR(100), birth_date DATE, integer_column INT, float_column
FLOAT, decimal_column
+ * DECIMAL(10,2), datetime_column DATETIME, date_column DATE, time_column
TIME, text_column
+ * TEXT, varchar_column VARCHAR(255), binary_column BINARY(10),
blob_column BLOB, is_active
+ * TINYINT(1) );
+ */
+ @Test
+ public void processMysqlDebeziumStructRecord() throws IOException {
+ String topic = "normal.wdl_test.example_table";
+ // no delete value
+ String noDeleteValue =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
[...]
+ String expectedNoDeleteValue =
+ "{\"id\":8,\"name\":\"Jfohn
Doe\",\"age\":430,\"email\":\"[email protected]\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"2024-04-12T10:30\",\"text_column\":\"Lorem
ipsum dolor sit amet, consectetur adipiscing
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__D
[...]
+ buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
+
+ // delete value
+ String deleteValue =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
[...]
+ String expectedDeleteValue =
+ "{\"id\":8,\"name\":\"Jfohn
Doe\",\"age\":430,\"email\":\"[email protected]\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"2024-04-12T10:30\",\"text_column\":\"Lorem
ipsum dolor sit amet, consectetur adipiscing
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__D
[...]
+ buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
+ }
+
+ private void buildProcessStructRecord(String topic, String sourceValue,
String target)
+ throws IOException {
+ SchemaAndValue noDeleteSchemaValue =
+ jsonConverter.toConnectData(topic,
sourceValue.getBytes(StandardCharsets.UTF_8));
+ SinkRecord noDeleteSinkRecord =
+ TestRecordBuffer.newSinkRecord(
+ noDeleteSchemaValue.value(), 8,
noDeleteSchemaValue.schema());
+ String processResult =
recordService.processStructRecord(noDeleteSinkRecord);
+ Assert.assertEquals(target, processResult);
+ }
+
@Test
public void processStructRecord() throws IOException {
props.remove("converter.mode");
@@ -68,56 +106,27 @@ public class TestRecordService {
// no delete value
String noDeleteValue =
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
[...]
- SchemaAndValue noDeleteSchemaValue =
- jsonConverter.toConnectData(topic,
noDeleteValue.getBytes(StandardCharsets.UTF_8));
- SinkRecord noDeleteSinkRecord =
- TestRecordBuffer.newSinkRecord(
- noDeleteSchemaValue.value(), 8,
noDeleteSchemaValue.schema());
- String noDeleteResult =
recordService.processStructRecord(noDeleteSinkRecord);
- Assert.assertEquals(
-
"{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}",
- noDeleteResult);
-
- // delete value
- String deleteValue =
-
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
[...]
- SchemaAndValue deleteSchemaValue =
- jsonConverter.toConnectData(topic,
deleteValue.getBytes(StandardCharsets.UTF_8));
- SinkRecord record2 =
- TestRecordBuffer.newSinkRecord(
- deleteSchemaValue.value(), 1,
deleteSchemaValue.schema());
- String s2 = recordService.processStructRecord(record2);
- Assert.assertEquals(
-
"{\"before\":{\"id\":24,\"name\":\"bb\"},\"after\":null,\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712545844000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5627,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"d\",\"ts_ms\":1712545844948,\"transaction\":null}",
- s2);
+ String expectedNoDeleteValue =
+
"{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}";
+ buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
}
@Test
- public void processStructRecordWithDebeziumSchema() {
+ public void processStructRecordWithDebeziumSchema() throws IOException {
String topic = "normal.wdl_test.test_sink_normal";
// no delete value
String noDeleteValue =
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
[...]
- SchemaAndValue noDeleteSchemaValue =
- jsonConverter.toConnectData(topic,
noDeleteValue.getBytes(StandardCharsets.UTF_8));
- SinkRecord noDeleteSinkRecord =
- TestRecordBuffer.newSinkRecord(
- noDeleteSchemaValue.value(), 8,
noDeleteSchemaValue.schema());
- String noDeleteResult =
recordService.processStructRecord(noDeleteSinkRecord);
- Assert.assertEquals(
-
"{\"id\":19,\"name\":\"fff\",\"__DORIS_DELETE_SIGN__\":\"0\"}", noDeleteResult);
+ String expectedNoDeleteValue =
+ "{\"id\":19,\"name\":\"fff\",\"__DORIS_DELETE_SIGN__\":\"0\"}";
+ buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
// delete value
String deleteValue =
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
[...]
- SchemaAndValue deleteSchemaValue =
- jsonConverter.toConnectData(topic,
deleteValue.getBytes(StandardCharsets.UTF_8));
- SinkRecord record2 =
- TestRecordBuffer.newSinkRecord(
- deleteSchemaValue.value(), 1,
deleteSchemaValue.schema());
- String s2 = recordService.processStructRecord(record2);
-
Assert.assertEquals("{\"id\":24,\"name\":\"bb\",\"__DORIS_DELETE_SIGN__\":\"1\"}",
s2);
+ String expectedDeleteValue =
"{\"id\":24,\"name\":\"bb\",\"__DORIS_DELETE_SIGN__\":\"1\"}";
+ buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]