This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 415f37a [Improve]The change basis of table schema is changed to parse
data column field (#17)
415f37a is described below
commit 415f37a7aa2f6bf27c969c8363499edd0399cfd0
Author: wudongliang <[email protected]>
AuthorDate: Thu May 16 14:05:32 2024 +0800
[Improve]The change basis of table schema is changed to parse data column
field (#17)
---
.../connector/jdbc/util}/DateTimeUtils.java | 6 +-
.../debezium/connector/jdbc/util/SchemaUtils.java | 58 +++++
.../doris/kafka/connector/cfg/DorisOptions.java | 17 +-
.../connector/cfg/DorisSinkConnectorConfig.java | 5 +-
.../connector/converter/RecordDescriptor.java | 67 ++---
.../kafka/connector/converter/RecordService.java | 148 ++++++++++-
.../schema/SchemaChangeManager.java | 84 ++++--
.../SchemaEvolutionMode.java} | 24 +-
.../connector/converter/type/AbstractDateType.java | 11 +-
.../connector/converter/type/AbstractTimeType.java | 28 +-
.../converter/type/AbstractTimestampType.java | 23 +-
.../connector/converter/type/AbstractType.java | 23 ++
.../doris/kafka/connector/converter/type/Type.java | 3 +
.../converter/type/connect/ConnectBooleanType.java | 8 +
.../converter/type/connect/ConnectBytesType.java | 7 +
.../converter/type/connect/ConnectDateType.java | 2 +-
.../converter/type/connect/ConnectDecimalType.java | 13 +
.../converter/type/connect/ConnectFloat32Type.java | 8 +
.../converter/type/connect/ConnectFloat64Type.java | 8 +
.../converter/type/connect/ConnectInt16Type.java | 8 +
.../converter/type/connect/ConnectInt32Type.java | 8 +
.../converter/type/connect/ConnectInt64Type.java | 8 +
.../converter/type/connect/ConnectInt8Type.java | 8 +
.../connect/ConnectMapToConnectStringType.java | 6 +
.../converter/type/connect/ConnectStringType.java | 19 ++
.../converter/type/connect/ConnectTimeType.java | 2 +-
.../type/connect/ConnectTimestampType.java | 2 +-
.../converter/type/debezium/DateType.java | 2 +-
.../converter/type/debezium/MicroTimeType.java | 2 +-
.../type/debezium/MicroTimestampType.java | 2 +-
.../converter/type/debezium/NanoTimeType.java | 2 +-
.../converter/type/debezium/NanoTimestampType.java | 2 +-
.../converter/type/debezium/TimeType.java | 2 +-
.../type/debezium/VariableScaleDecimalType.java | 12 +
.../type/doris}/DorisType.java | 5 +-
.../type/doris/DorisTypeProperties.java} | 8 +-
.../kafka/connector/dialect/mysql/MysqlType.java | 213 ---------------
.../connector/service/DorisDefaultSinkService.java | 26 +-
.../kafka/connector/utils/ConfigCheckUtils.java | 29 ---
.../writer/schema/DebeziumSchemaChange.java | 289 ---------------------
.../writer/schema/SchemaChangeHelper.java | 159 ------------
.../connector/converter/TestRecordService.java | 99 ++++++-
.../connector/writer/TestDebeziumSchemaChange.java | 133 ----------
.../kafka/connector/writer/TestRecordBuffer.java | 11 +-
44 files changed, 640 insertions(+), 960 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
b/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java
similarity index 95%
rename from
src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
rename to src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java
index 09ee9d0..941254d 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
+++ b/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java
@@ -15,8 +15,12 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
+ * Copied from
+ *
https://github.com/debezium/debezium-connector-jdbc/blob/main/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java
*/
-package org.apache.doris.kafka.connector.converter.utils;
+
+package io.debezium.connector.jdbc.util;
import io.debezium.time.Conversions;
import java.sql.Timestamp;
diff --git a/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java
b/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java
new file mode 100644
index 0000000..178507c
--- /dev/null
+++ b/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Copied from
+ *
https://github.com/debezium/debezium-connector-jdbc/blob/main/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java
+ */
+
+package io.debezium.connector.jdbc.util;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.connect.data.Schema;
+
+public class SchemaUtils {
+ private static final String SCHEMA_PARAMETER_COLUMN_TYPE =
"__debezium.source.column.type";
+ private static final String SCHEMA_PARAMETER_COLUMN_LENGTH =
"__debezium.source.column.length";
+ private static final String SCHEMA_PARAMETER_COLUMN_PRECISION =
+ "__debezium.source.column.scale";
+ private static final String SCHEMA_PARAMETER_COLUMN_NAME =
"__debezium.source.column.name";
+
+ public static Optional<String> getSourceColumnType(Schema schema) {
+ return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_TYPE);
+ }
+
+ public static Optional<String> getSourceColumnLength(Schema schema) {
+ return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_LENGTH);
+ }
+
+ public static Optional<String> getSourceColumnPrecision(Schema schema) {
+ return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_PRECISION);
+ }
+
+ public static Optional<String> getSourceColumnName(Schema schema) {
+ return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_NAME);
+ }
+
+ public static Optional<String> getSchemaParameter(Schema schema, String
parameterName) {
+ if (!Objects.isNull(schema.parameters())) {
+ return Optional.ofNullable(schema.parameters().get(parameterName));
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index e9f1297..4596f69 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.doris.kafka.connector.converter.ConverterMode;
+import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -44,7 +45,6 @@ public class DorisOptions {
private final String password;
private final String database;
private final Map<String, String> topicMap;
- private final String schemaTopic;
private final int fileSize;
private final int recordNum;
private long flushTime;
@@ -62,6 +62,7 @@ public class DorisOptions {
private LoadModel loadModel;
private DeliveryGuarantee deliveryGuarantee;
private ConverterMode converterMode;
+ private SchemaEvolutionMode schemaEvolutionMode;
public DorisOptions(Map<String, String> config) {
this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -91,6 +92,11 @@ public class DorisOptions {
config.getOrDefault(
DorisSinkConnectorConfig.CONVERT_MODE,
DorisSinkConnectorConfig.CONVERT_MODE_DEFAULT));
+ this.schemaEvolutionMode =
+ SchemaEvolutionMode.of(
+ config.getOrDefault(
+ DorisSinkConnectorConfig.SCHEMA_EVOLUTION,
+
DorisSinkConnectorConfig.SCHEMA_EVOLUTION_DEFAULT));
this.fileSize =
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES));
this.recordNum =
@@ -105,7 +111,6 @@ public class DorisOptions {
this.flushTime =
DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN;
}
this.topicMap = getTopicToTableMap(config);
- this.schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC);
enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT;
if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) {
@@ -281,6 +286,10 @@ public class DorisOptions {
return this.converterMode;
}
+ public SchemaEvolutionMode getSchemaEvolutionMode() {
+ return this.schemaEvolutionMode;
+ }
+
public boolean isAutoRedirect() {
return autoRedirect;
}
@@ -293,10 +302,6 @@ public class DorisOptions {
return enableDelete;
}
- public String getSchemaTopic() {
- return schemaTopic;
- }
-
/**
* parse topic to table map
*
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 94ea08e..5c33da4 100644
---
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import java.util.Map;
import org.apache.doris.kafka.connector.DorisSinkConnector;
import org.apache.doris.kafka.connector.converter.ConverterMode;
+import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -78,9 +79,11 @@ public class DorisSinkConnectorConfig {
public static final String DELIVERY_GUARANTEE_DEFAULT =
DeliveryGuarantee.AT_LEAST_ONCE.name();
public static final String CONVERT_MODE = "converter.mode";
public static final String CONVERT_MODE_DEFAULT =
ConverterMode.NORMAL.getName();
- public static final String SCHEMA_TOPIC = "schema.topic";
+
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+ public static final String SCHEMA_EVOLUTION = "schema.evolution";
+ public static final String SCHEMA_EVOLUTION_DEFAULT =
SchemaEvolutionMode.NONE.getName();
// metrics
public static final String JMX_OPT = "jmx";
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
index 11d097f..c4f7243 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
@@ -23,7 +23,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import org.apache.doris.kafka.connector.dialect.mysql.MysqlType;
+import org.apache.doris.kafka.connector.converter.type.Type;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@@ -119,27 +119,34 @@ public class RecordDescriptor {
public static class FieldDescriptor {
private final Schema schema;
private final String name;
+ private final Map<String, Type> typeRegistry;
+ private final Type type;
+ private final String typeName;
private final String schemaTypeName;
private final String schemaName;
private String comment;
private String defaultValue;
- public FieldDescriptor(
- Schema schema, String name, String schemaTypeName, String
schemaName) {
+ public FieldDescriptor(Schema schema, String name, Map<String, Type>
typeRegistry) {
this.schema = schema;
this.name = name;
- this.schemaTypeName = schemaTypeName;
- this.schemaName = schemaName;
+ this.typeRegistry = typeRegistry;
+ this.schemaName = schema.name();
+ this.schemaTypeName = schema.type().name();
+ this.type =
+ Objects.nonNull(schema.name())
+ ? typeRegistry.get(schema.name())
+ : typeRegistry.get(schema.type().name());
+ this.typeName = type.getTypeName(schema);
}
public FieldDescriptor(
Schema schema,
String name,
- String schemaTypeName,
- String schemaName,
+ Map<String, Type> typeRegistry,
String comment,
String defaultValue) {
- this(schema, name, schemaTypeName, schemaName);
+ this(schema, name, typeRegistry);
this.comment = comment;
this.defaultValue = defaultValue;
}
@@ -148,6 +155,14 @@ public class RecordDescriptor {
return name;
}
+ public Type getType() {
+ return type;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
public String getSchemaName() {
return schemaName;
}
@@ -172,7 +187,7 @@ public class RecordDescriptor {
public static class Builder {
private SinkRecord sinkRecord;
- private Struct tableChange;
+ private Map<String, Type> typeRegistry;
// Internal build state
private final List<String> keyFieldNames = new ArrayList<>();
@@ -184,8 +199,8 @@ public class RecordDescriptor {
return this;
}
- public Builder withTableChange(Struct tableChange) {
- this.tableChange = tableChange;
+ public Builder withTypeRegistry(Map<String, Type> typeRegistry) {
+ this.typeRegistry = typeRegistry;
return this;
}
@@ -193,11 +208,7 @@ public class RecordDescriptor {
Objects.requireNonNull(sinkRecord, "The sink record must be
provided.");
final boolean flattened = !isTombstone(sinkRecord) &&
isFlattened(sinkRecord);
- if (Objects.nonNull(tableChange)) {
- readTableChangeData(tableChange);
- } else {
- readSinkRecordNonKeyData(sinkRecord, flattened);
- }
+ readSinkRecordNonKeyData(sinkRecord, flattened);
return new RecordDescriptor(
sinkRecord,
@@ -208,27 +219,6 @@ public class RecordDescriptor {
flattened);
}
- private void readTableChangeData(final Struct tableChange) {
- Struct tableChangeTable = tableChange.getStruct("table");
- List<Object> tableChangeColumns =
tableChangeTable.getArray("columns");
- for (Object column : tableChangeColumns) {
- Struct columnStruct = (Struct) column;
- Schema schema = columnStruct.schema();
- String name = columnStruct.getString("name");
- String typeName = columnStruct.getString("typeName");
- Integer length = columnStruct.getInt32("length");
- Integer scale = columnStruct.getInt32("scale");
- String dorisType = MysqlType.toDorisType(typeName, length,
scale);
- String comment = columnStruct.getString("comment");
- String defaultValue =
columnStruct.getString("defaultValueExpression");
- nonKeyFieldNames.add(name);
- allFields.put(
- name,
- new FieldDescriptor(
- schema, name, dorisType, schema.name(),
comment, defaultValue));
- }
- }
-
private boolean isFlattened(SinkRecord record) {
return record.valueSchema().name() == null
|| !record.valueSchema().name().contains("Envelope");
@@ -266,8 +256,7 @@ public class RecordDescriptor {
}
private void applyNonKeyField(String name, Schema schema) {
- FieldDescriptor fieldDescriptor =
- new FieldDescriptor(schema, name, schema.type().name(),
schema.name());
+ FieldDescriptor fieldDescriptor = new FieldDescriptor(schema,
name, typeRegistry);
nonKeyFieldNames.add(fieldDescriptor.getName());
allFields.put(fieldDescriptor.getName(), fieldDescriptor);
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index 1390761..8bc2480 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -21,17 +21,32 @@ package org.apache.doris.kafka.connector.converter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.debezium.util.Strings;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.StringJoiner;
+import java.util.stream.Collectors;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.schema.SchemaChangeManager;
+import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
import org.apache.doris.kafka.connector.converter.type.Type;
import org.apache.doris.kafka.connector.exception.DataFormatException;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.model.ColumnDescriptor;
+import org.apache.doris.kafka.connector.model.TableDescriptor;
+import org.apache.doris.kafka.connector.model.doris.Schema;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
+import org.apache.doris.kafka.connector.service.RestService;
import org.apache.doris.kafka.connector.writer.LoadConstants;
import org.apache.doris.kafka.connector.writer.RecordBuffer;
import org.apache.kafka.connect.data.Struct;
@@ -43,10 +58,14 @@ import org.slf4j.LoggerFactory;
public class RecordService {
private static final Logger LOG =
LoggerFactory.getLogger(RecordService.class);
+ public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
private static final ObjectMapper MAPPER = new ObjectMapper();
private final JsonConverter converter;
+ private DorisSystemService dorisSystemService;
+ private SchemaChangeManager schemaChangeManager;
private DorisOptions dorisOptions;
private RecordTypeRegister recordTypeRegister;
+ private Set<RecordDescriptor.FieldDescriptor> missingFields;
public RecordService() {
this.converter = new JsonConverter();
@@ -59,6 +78,8 @@ public class RecordService {
this();
this.dorisOptions = dorisOptions;
this.recordTypeRegister = new RecordTypeRegister(dorisOptions);
+ this.dorisSystemService = new DorisSystemService(dorisOptions);
+ this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
}
/**
@@ -68,10 +89,14 @@ public class RecordService {
public String processStructRecord(SinkRecord record) {
String processedRecord;
if (ConverterMode.DEBEZIUM_INGESTION ==
dorisOptions.getConverterMode()) {
+ validate(record);
RecordDescriptor recordDescriptor = buildRecordDescriptor(record);
if (recordDescriptor.isTombstone()) {
return null;
}
+ String tableName =
dorisOptions.getTopicMapTable(recordDescriptor.getTopicName());
+ checkAndApplyTableChangesIfNeeded(tableName, recordDescriptor);
+
List<String> nonKeyFieldNames =
recordDescriptor.getNonKeyFieldNames();
if (recordDescriptor.isDelete()) {
processedRecord =
@@ -96,6 +121,101 @@ public class RecordService {
return processedRecord;
}
+ private void validate(SinkRecord record) {
+ if (isSchemaChange(record)) {
+ LOG.warn(
+ "Schema change records are not supported by JDBC
connector. Adjust `topics` or `topics.regex` to exclude schema change topic.");
+ throw new DorisException(
+ "Schema change records are not supported by JDBC
connector. Adjust `topics` or `topics.regex` to exclude schema change topic.");
+ }
+ }
+
+ private static boolean isSchemaChange(SinkRecord record) {
+ return record.valueSchema() != null
+ && !Strings.isNullOrEmpty(record.valueSchema().name())
+ && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
+ }
+
+ private void checkAndApplyTableChangesIfNeeded(
+ String tableName, RecordDescriptor recordDescriptor) {
+ if (!hasTable(tableName)) {
+ // TODO Table does not exist, lets attempt to create it.
+ } else {
+ // Table exists, lets attempt to alter it if necessary.
+ alterTableIfNeeded(tableName, recordDescriptor);
+ }
+ }
+
+ private boolean hasTable(String tableName) {
+ return dorisSystemService.tableExists(dorisOptions.getDatabase(),
tableName);
+ }
+
+ private void alterTableIfNeeded(String tableName, RecordDescriptor record)
{
+ // Resolve table metadata from the database
+ final TableDescriptor table = obtainTableSchema(tableName);
+
+ missingFields = resolveMissingFields(record, table);
+ if (missingFields.isEmpty()) {
+ // There are no missing fields, simply return
+ // TODO should we check column type changes or default value
changes?
+ return;
+ }
+
+ LOG.info(
+ "Find some miss columns in {} table, try to alter add this
columns={}.",
+ tableName,
+ missingFields.stream()
+ .map(RecordDescriptor.FieldDescriptor::getName)
+ .collect(Collectors.toList()));
+ if
(SchemaEvolutionMode.NONE.equals(dorisOptions.getSchemaEvolutionMode())) {
+ LOG.warn(
+ "Table '{}' cannot be altered because schema evolution is
disabled.",
+ tableName);
+ throw new SchemaChangeException(
+ "Cannot alter table " + table + " because schema evolution
is disabled");
+ }
+ for (RecordDescriptor.FieldDescriptor missingField : missingFields) {
+ schemaChangeManager.addColumnDDL(tableName, missingField);
+ }
+ }
+
+ private Set<RecordDescriptor.FieldDescriptor> resolveMissingFields(
+ RecordDescriptor record, TableDescriptor table) {
+ Set<RecordDescriptor.FieldDescriptor> missingFields = new HashSet<>();
+ for (Map.Entry<String, RecordDescriptor.FieldDescriptor> entry :
+ record.getFields().entrySet()) {
+ String filedName = entry.getKey();
+ if (!table.hasColumn(filedName)) {
+ missingFields.add(entry.getValue());
+ }
+ }
+ return missingFields;
+ }
+
+ private TableDescriptor obtainTableSchema(String tableName) {
+ // TODO when the table structure is obtained from doris for first
time, it should be
+ // obtained in the cache later.
+ Schema schema =
+ RestService.getSchema(dorisOptions,
dorisOptions.getDatabase(), tableName, LOG);
+ List<ColumnDescriptor> columnDescriptors = new ArrayList<>();
+ schema.getProperties()
+ .forEach(
+ column -> {
+ ColumnDescriptor columnDescriptor =
+ ColumnDescriptor.builder()
+ .columnName(column.getName())
+ .typeName(column.getType())
+ .comment(column.getComment())
+ .build();
+ columnDescriptors.add(columnDescriptor);
+ });
+ return TableDescriptor.builder()
+ .tableName(tableName)
+ .type(schema.getKeysType())
+ .columns(columnDescriptors)
+ .build();
+ }
+
/** process list record from kafka [{"name":"doris1"},{"name":"doris2"}] */
public String processListRecord(SinkRecord record) {
try {
@@ -130,19 +250,13 @@ public class RecordService {
RecordDescriptor record, Struct source, List<String> fields,
boolean isDelete) {
Map<String, Object> filedMapping = new LinkedHashMap<>();
String filedResult = null;
- final Map<String, Type> typeRegistry =
recordTypeRegister.getTypeRegistry();
for (String fieldName : fields) {
final RecordDescriptor.FieldDescriptor field =
record.getFields().get(fieldName);
- String fieldSchemaName = field.getSchemaName();
- String fieldSchemaTypeName = field.getSchemaTypeName();
+ Type type = field.getType();
Object value =
field.getSchema().isOptional()
? source.getWithoutDefault(fieldName)
: source.get(fieldName);
- Type type =
- Objects.nonNull(fieldSchemaName)
- ? typeRegistry.get(fieldSchemaName)
- : typeRegistry.get(fieldSchemaTypeName);
Object convertValue = type.getValue(value);
if (Objects.nonNull(convertValue) && !type.isNumber()) {
filedMapping.put(fieldName, convertValue.toString());
@@ -186,10 +300,28 @@ public class RecordService {
private RecordDescriptor buildRecordDescriptor(SinkRecord record) {
RecordDescriptor recordDescriptor;
try {
- recordDescriptor =
RecordDescriptor.builder().withSinkRecord(record).build();
+ recordDescriptor =
+ RecordDescriptor.builder()
+ .withSinkRecord(record)
+
.withTypeRegistry(recordTypeRegister.getTypeRegistry())
+ .build();
} catch (Exception e) {
throw new ConnectException("Failed to process a sink record", e);
}
return recordDescriptor;
}
+
+ public void setSchemaChangeManager(SchemaChangeManager
schemaChangeManager) {
+ this.schemaChangeManager = schemaChangeManager;
+ }
+
+ @VisibleForTesting
+ public void setDorisSystemService(DorisSystemService dorisSystemService) {
+ this.dorisSystemService = dorisSystemService;
+ }
+
+ @VisibleForTesting
+ public Set<RecordDescriptor.FieldDescriptor> getMissingFields() {
+ return missingFields;
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
similarity index 69%
rename from
src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
rename to
src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
index 1ee9c1e..376edf9 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.doris.kafka.connector.writer.schema;
+package org.apache.doris.kafka.connector.converter.schema;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
@@ -28,8 +28,8 @@ import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
import org.apache.doris.kafka.connector.exception.SchemaChangeException;
-import org.apache.doris.kafka.connector.utils.HttpGetWithEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
@@ -44,8 +44,7 @@ import org.slf4j.LoggerFactory;
public class SchemaChangeManager implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(SchemaChangeManager.class);
- private static final String CHECK_SCHEMA_CHANGE_API =
- "http://%s/api/enable_light_schema_change/%s/%s";
+ private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
private static final String SCHEMA_CHANGE_API =
"http://%s/api/query/default_cluster/%s";
private final ObjectMapper objectMapper = new ObjectMapper();
private final DorisOptions dorisOptions;
@@ -54,29 +53,6 @@ public class SchemaChangeManager implements Serializable {
this.dorisOptions = dorisOptions;
}
- public static Map<String, Object> buildRequestParam(boolean dropColumn,
String columnName) {
- Map<String, Object> params = new HashMap<>();
- params.put("isDropColumn", dropColumn);
- params.put("columnName", columnName);
- return params;
- }
-
- /** check ddl can do light schema change. */
- public boolean checkSchemaChange(String database, String table,
Map<String, Object> params)
- throws IllegalArgumentException, IOException {
- if (params.isEmpty()) {
- return false;
- }
- String requestUrl =
- String.format(CHECK_SCHEMA_CHANGE_API,
dorisOptions.getHttpUrl(), database, table);
- HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
- httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
- httpGet.setEntity(new
StringEntity(objectMapper.writeValueAsString(params)));
- String responseEntity = "";
- Map<String, Object> responseMap = handleResponse(httpGet,
responseEntity);
- return handleSchemaChange(responseMap, responseEntity);
- }
-
private boolean handleSchemaChange(Map<String, Object> responseMap, String
responseEntity) {
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
@@ -86,6 +62,60 @@ public class SchemaChangeManager implements Serializable {
}
}
+ public void addColumnDDL(String tableName,
RecordDescriptor.FieldDescriptor field) {
+ try {
+ String addColumnDDL =
buildAddColumnDDL(dorisOptions.getDatabase(), tableName, field);
+ boolean status = execute(addColumnDDL, dorisOptions.getDatabase());
+ LOG.info(
+ "Add missing column for {} table, ddl={}, status={}",
+ tableName,
+ addColumnDDL,
+ status);
+ } catch (Exception e) {
+ LOG.warn("Failed to add column for {}, cause by: ", tableName, e);
+ throw new SchemaChangeException(
+ "Failed to add column for " + tableName + ", cause by:",
e);
+ }
+ }
+
+ public static String buildAddColumnDDL(
+ String database, String tableName,
RecordDescriptor.FieldDescriptor field) {
+ String name = field.getName();
+ String typeName = field.getTypeName();
+ String comment = field.getComment();
+ String defaultValue = field.getDefaultValue();
+
+ String addDDL =
+ String.format(
+ ADD_DDL,
+ identifier(database) + "." + identifier(tableName),
+ identifier(name),
+ typeName);
+ if (defaultValue != null) {
+ addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue);
+ }
+ if (StringUtils.isNotEmpty(comment)) {
+ addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'";
+ }
+ return addDDL;
+ }
+
+ private static String quoteComment(String comment) {
+ return comment.replaceAll("'", "\\\\'");
+ }
+
+ private static String identifier(String name) {
+ return "`" + name + "`";
+ }
+
+ private static String quoteDefaultValue(String defaultValue) {
+ // DEFAULT current_timestamp not need quote
+ if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+ return defaultValue;
+ }
+ return "'" + defaultValue + "'";
+ }
+
/** execute sql in doris. */
public boolean execute(String ddl, String database)
throws IOException, IllegalArgumentException {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
similarity index 67%
copy from
src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
copy to
src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
index 55c82cf..d9b6a9b 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
@@ -16,19 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.doris.kafka.connector.converter.type.connect;
-public class ConnectInt8Type extends AbstractConnectSchemaType {
+package org.apache.doris.kafka.connector.converter.schema;
- public static final ConnectInt8Type INSTANCE = new ConnectInt8Type();
+public enum SchemaEvolutionMode {
+ NONE("none"),
- @Override
- public String[] getRegistrationKeys() {
- return new String[] {"INT8"};
+ BASIC("basic");
+
+ private final String name;
+
+ SchemaEvolutionMode(String name) {
+ this.name = name;
+ }
+
+ public static SchemaEvolutionMode of(String name) {
+ return SchemaEvolutionMode.valueOf(name.toUpperCase());
}
- @Override
- public boolean isNumber() {
- return true;
+ public String getName() {
+ return name;
}
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
index de32cd6..ab25931 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
@@ -18,5 +18,14 @@
*/
package org.apache.doris.kafka.connector.converter.type;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
/** An abstract base class for all temporal date implementations of {@link
Type}. */
-public abstract class AbstractDateType extends AbstractTemporalType {}
+public abstract class AbstractDateType extends AbstractTemporalType {
+
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.DATE;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
index 533f1e1..79e0105 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
@@ -18,5 +18,31 @@
*/
package org.apache.doris.kafka.connector.converter.type;
+import java.util.Optional;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import
org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties;
+import org.apache.kafka.connect.data.Schema;
+
/** An abstract temporal implementation of {@link Type} for {@code TIME} based
columns. */
-public abstract class AbstractTimeType extends AbstractTemporalType {}
+public abstract class AbstractTimeType extends AbstractTemporalType {
+
+ @Override
+ public String getTypeName(Schema schema) {
+ // NOTE:
+ // The MySQL connector does not use the __debezium.source.column.scale
parameter to pass
+ // the time column's precision but instead uses the
__debezium.source.column.length key
+ // which differs from all other connector implementations.
+ //
+ final int precision = getTimePrecision(schema);
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME,
+ Math.min(precision,
DorisTypeProperties.MAX_SUPPORTED_DATE_TIME_PRECISION));
+ }
+
+ protected int getTimePrecision(Schema schema) {
+ final String length = getSourceColumnLength(schema).orElse("0");
+ final Optional<String> scale = getSourceColumnPrecision(schema);
+ return scale.map(Integer::parseInt).orElseGet(() ->
Integer.parseInt(length));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
index 3d50376..0b8d45d 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
@@ -18,5 +18,26 @@
*/
package org.apache.doris.kafka.connector.converter.type;
+import java.util.Optional;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import
org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties;
+import org.apache.kafka.connect.data.Schema;
+
/** An abstract temporal implementation of {@link Type} for {@code TIMESTAMP}
based columns. */
-public abstract class AbstractTimestampType extends AbstractTemporalType {}
+public abstract class AbstractTimestampType extends AbstractTemporalType {
+
+ @Override
+ public String getTypeName(Schema schema) {
+ final int precision = getTimePrecision(schema);
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME,
+ Math.min(precision,
DorisTypeProperties.MAX_SUPPORTED_DATE_TIME_PRECISION));
+ }
+
+ protected int getTimePrecision(Schema schema) {
+ final String length = getSourceColumnLength(schema).orElse("0");
+ final Optional<String> scale = getSourceColumnPrecision(schema);
+ return scale.map(Integer::parseInt).orElseGet(() ->
Integer.parseInt(length));
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
index d915a89..650e792 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
@@ -18,7 +18,11 @@
*/
package org.apache.doris.kafka.connector.converter.type;
+import io.debezium.connector.jdbc.util.SchemaUtils;
+import java.util.Objects;
+import java.util.Optional;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.kafka.connect.data.Schema;
/** An abstract implementation of {@link Type}, which all types should extend.
*/
public abstract class AbstractType implements Type {
@@ -40,4 +44,23 @@ public abstract class AbstractType implements Type {
public String toString() {
return getClass().getSimpleName();
}
+
+ protected Optional<String> getSchemaParameter(Schema schema, String
parameterName) {
+ if (!Objects.isNull(schema.parameters())) {
+ return Optional.ofNullable(schema.parameters().get(parameterName));
+ }
+ return Optional.empty();
+ }
+
+ protected Optional<String> getSourceColumnType(Schema schema) {
+ return SchemaUtils.getSourceColumnType(schema);
+ }
+
+ protected Optional<String> getSourceColumnLength(Schema schema) {
+ return SchemaUtils.getSourceColumnLength(schema);
+ }
+
+ protected Optional<String> getSourceColumnPrecision(Schema schema) {
+ return SchemaUtils.getSourceColumnPrecision(schema);
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
index c284f0e..698e838 100644
--- a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
+++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
@@ -19,6 +19,7 @@
package org.apache.doris.kafka.connector.converter.type;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.kafka.connect.data.Schema;
/**
* A type indicates the type of each column of kafka record, including various
column types of
@@ -42,5 +43,7 @@ public interface Type {
/** Get the actual converted value based on the column type. */
Object getValue(Object sourceValue);
+ String getTypeName(Schema schema);
+
boolean isNumber();
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
index 18c5af3..dfac2d5 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
@@ -18,6 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
public class ConnectBooleanType extends AbstractConnectSchemaType {
public static final ConnectBooleanType INSTANCE = new ConnectBooleanType();
@@ -26,4 +29,9 @@ public class ConnectBooleanType extends
AbstractConnectSchemaType {
public String[] getRegistrationKeys() {
return new String[] {"BOOLEAN"};
}
+
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.BOOLEAN;
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
index 6c2701c..fbd07a9 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
@@ -19,6 +19,8 @@
package org.apache.doris.kafka.connector.converter.type.connect;
import java.nio.ByteBuffer;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
public class ConnectBytesType extends AbstractConnectSchemaType {
@@ -37,6 +39,11 @@ public class ConnectBytesType extends
AbstractConnectSchemaType {
return bytesToHexString(getByteArrayFromValue(sourceValue));
}
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.STRING;
+ }
+
private byte[] getByteArrayFromValue(Object value) {
byte[] byteArray = null;
if (value instanceof ByteBuffer) {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
index 4106f8d..acac4af 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
@@ -18,8 +18,8 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import org.apache.doris.kafka.connector.converter.type.AbstractDateType;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.errors.ConnectException;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
index 8625883..a0ab7c9 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
@@ -19,7 +19,9 @@
package org.apache.doris.kafka.connector.converter.type.connect;
import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +36,17 @@ public class ConnectDecimalType extends AbstractType {
return new String[] {Decimal.LOGICAL_NAME};
}
+ @Override
+ public String getTypeName(Schema schema) {
+ int scale = Integer.parseInt(getSchemaParameter(schema,
"scale").orElse("0"));
+ int precision =
+ Integer.parseInt(
+ getSchemaParameter(schema,
"connect.decimal.precision").orElse("0"));
+ return precision <= 38
+ ? String.format("%s(%s,%s)", DorisType.DECIMAL, precision,
Math.max(scale, 0))
+ : DorisType.STRING;
+ }
+
@Override
public boolean isNumber() {
return true;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
index 98b6936..fc75ba9 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
@@ -18,6 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
public class ConnectFloat32Type extends AbstractConnectSchemaType {
public static final ConnectFloat32Type INSTANCE = new ConnectFloat32Type();
@@ -27,6 +30,11 @@ public class ConnectFloat32Type extends
AbstractConnectSchemaType {
return new String[] {"FLOAT32"};
}
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.FLOAT;
+ }
+
@Override
public boolean isNumber() {
return true;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
index f050c15..3a74391 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
@@ -18,6 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
public class ConnectFloat64Type extends AbstractConnectSchemaType {
public static final ConnectFloat64Type INSTANCE = new ConnectFloat64Type();
@@ -27,6 +30,11 @@ public class ConnectFloat64Type extends
AbstractConnectSchemaType {
return new String[] {"FLOAT64"};
}
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.DOUBLE;
+ }
+
@Override
public boolean isNumber() {
return true;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
index 573813b..6a61c77 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
@@ -18,6 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
public class ConnectInt16Type extends AbstractConnectSchemaType {
public static final ConnectInt16Type INSTANCE = new ConnectInt16Type();
@@ -27,6 +30,11 @@ public class ConnectInt16Type extends
AbstractConnectSchemaType {
return new String[] {"INT16"};
}
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.SMALLINT;
+ }
+
@Override
public boolean isNumber() {
return true;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
index 50dd6c7..e11ad5f 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
@@ -18,6 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
public class ConnectInt32Type extends AbstractConnectSchemaType {
public static final ConnectInt32Type INSTANCE = new ConnectInt32Type();
@@ -27,6 +30,11 @@ public class ConnectInt32Type extends
AbstractConnectSchemaType {
return new String[] {"INT32"};
}
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.INT;
+ }
+
@Override
public boolean isNumber() {
return true;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
index c08abb6..a322da6 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
@@ -18,6 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
public class ConnectInt64Type extends AbstractConnectSchemaType {
public static final ConnectInt64Type INSTANCE = new ConnectInt64Type();
@@ -27,6 +30,11 @@ public class ConnectInt64Type extends
AbstractConnectSchemaType {
return new String[] {"INT64"};
}
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.BIGINT;
+ }
+
@Override
public boolean isNumber() {
return true;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
index 55c82cf..5c3fae6 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
@@ -18,6 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
public class ConnectInt8Type extends AbstractConnectSchemaType {
public static final ConnectInt8Type INSTANCE = new ConnectInt8Type();
@@ -27,6 +30,11 @@ public class ConnectInt8Type extends
AbstractConnectSchemaType {
return new String[] {"INT8"};
}
+ @Override
+ public String getTypeName(Schema schema) {
+ return DorisType.TINYINT;
+ }
+
@Override
public boolean isNumber() {
return true;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
index cac2624..707dd1c 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
@@ -19,12 +19,18 @@
package org.apache.doris.kafka.connector.converter.type.connect;
import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
public class ConnectMapToConnectStringType extends AbstractConnectMapType {
public static final ConnectMapToConnectStringType INSTANCE =
new ConnectMapToConnectStringType();
+ @Override
+ public String getTypeName(Schema schema) {
+ return ConnectStringType.INSTANCE.getTypeName(schema);
+ }
+
@Override
public Object getValue(Object sourceValue) {
if (sourceValue instanceof Map) {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
index 0353020..bda5478 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
@@ -18,6 +18,10 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import
org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties;
+import org.apache.kafka.connect.data.Schema;
+
/**
* An implementation of {@link
org.apache.doris.kafka.connector.converter.type.Type} that supports
* {@code STRING} connect schema types.
@@ -26,8 +30,23 @@ public class ConnectStringType extends
AbstractConnectSchemaType {
public static final ConnectStringType INSTANCE = new ConnectStringType();
+ @Override
+ public String getTypeName(Schema schema) {
+ int columnLength = getColumnLength(schema);
+ if (columnLength > 0) {
+ return columnLength * 3 > DorisTypeProperties.MAX_VARCHAR_SIZE
+ ? DorisType.STRING
+ : String.format("%s(%s)", DorisType.VARCHAR, columnLength
* 3);
+ }
+ return DorisType.STRING;
+ }
+
@Override
public String[] getRegistrationKeys() {
return new String[] {"STRING"};
}
+
+ private int getColumnLength(Schema schema) {
+ return Integer.parseInt(getSourceColumnLength(schema).orElse("0"));
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
index de3be44..c2e1698 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
@@ -18,12 +18,12 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
import org.apache.doris.kafka.connector.converter.type.AbstractTimeType;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.errors.ConnectException;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
index 8af71b9..2de8c42 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
@@ -18,8 +18,8 @@
*/
package org.apache.doris.kafka.connector.converter.type.connect;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
index 912f0a4..a5589f3 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
@@ -18,9 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.debezium;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import io.debezium.time.Date;
import org.apache.doris.kafka.connector.converter.type.AbstractDateType;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
import org.apache.kafka.connect.errors.ConnectException;
public class DateType extends AbstractDateType {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
index 36eeceb..b2a1381 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
@@ -18,9 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.debezium;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import io.debezium.time.MicroTime;
import java.time.LocalTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
public class MicroTimeType extends AbstractDebeziumTimeType {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
index b8c71a2..cb8e3c9 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
@@ -18,9 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.debezium;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import io.debezium.time.MicroTimestamp;
import java.time.LocalDateTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
public class MicroTimestampType extends AbstractDebeziumTimestampType {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
index 9519e64..abcc05e 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
@@ -18,9 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.debezium;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import io.debezium.time.NanoTime;
import java.time.LocalTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
public class NanoTimeType extends AbstractDebeziumTimeType {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
index eec06c8..a7c08d0 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
@@ -18,10 +18,10 @@
*/
package org.apache.doris.kafka.connector.converter.type.debezium;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTimestamp;
import java.time.LocalDateTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
/**
* An implementation of {@link
org.apache.doris.kafka.connector.converter.type.Type} for {@link
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
index 83e95d9..be1d329 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
@@ -18,9 +18,9 @@
*/
package org.apache.doris.kafka.connector.converter.type.debezium;
+import io.debezium.connector.jdbc.util.DateTimeUtils;
import io.debezium.time.Time;
import java.time.LocalTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
public class TimeType extends AbstractDebeziumTimeType {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
index e38fe41..1cbff47 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
@@ -22,6 +22,8 @@ import io.debezium.data.VariableScaleDecimal;
import java.math.BigDecimal;
import java.util.Optional;
import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
@@ -51,6 +53,16 @@ public class VariableScaleDecimalType extends AbstractType {
getClass().getSimpleName(), sourceValue,
sourceValue.getClass().getName()));
}
+ @Override
+ public String getTypeName(Schema schema) {
+ // The data passed by VariableScaleDecimal data types does not provide
adequate information
+ // to
+ // resolve the precision and scale for the data type, so instead we're
going to default to
+ // the
+ // maximum double-based data types for the dialect, using DOUBLE.
+ return DorisType.DOUBLE;
+ }
+
@Override
public boolean isNumber() {
return true;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java
similarity index 89%
rename from
src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
rename to
src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java
index 89b416a..c89be76 100644
--- a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.doris.kafka.connector.dialect;
+package org.apache.doris.kafka.connector.converter.type.doris;
public class DorisType {
public static final String BOOLEAN = "BOOLEAN";
@@ -29,11 +29,8 @@ public class DorisType {
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String DECIMAL = "DECIMAL";
- public static final String DECIMAL_V3 = "DECIMALV3";
public static final String DATE = "DATE";
- public static final String DATE_V2 = "DATEV2";
public static final String DATETIME = "DATETIME";
- public static final String DATETIME_V2 = "DATETIMEV2";
public static final String CHAR = "CHAR";
public static final String VARCHAR = "VARCHAR";
public static final String STRING = "STRING";
diff --git
a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java
similarity index 83%
rename from
src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
rename to
src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java
index 62e4cab..b20ae8e 100644
---
a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java
@@ -17,12 +17,16 @@
* under the License.
*/
-package org.apache.doris.kafka.connector.dialect;
+package org.apache.doris.kafka.connector.converter.type.doris;
-public class DialectProperties {
+public class DorisTypeProperties {
/* Max precision of datetime type of Doris. */
public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
public static final int TIMESTAMP_TYPE_MAX_PRECISION = 9;
+
+ public static final int MAX_VARCHAR_SIZE = 65533;
+
+ public static final int MAX_CHAR_SIZE = 255;
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
deleted file mode 100644
index 663aadf..0000000
---
a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.dialect.mysql;
-
-import static
org.apache.doris.kafka.connector.dialect.DialectProperties.MAX_SUPPORTED_DATE_TIME_PRECISION;
-import static
org.apache.doris.kafka.connector.dialect.DialectProperties.TIMESTAMP_TYPE_MAX_PRECISION;
-
-import com.google.common.base.Preconditions;
-import org.apache.doris.kafka.connector.dialect.DorisType;
-
-public class MysqlType {
-
- // MySQL driver returns width of timestamp types instead of precision.
- // 19 characters are used for zero-precision timestamps while others
- // require 19 + precision + 1 characters with the additional character
- // required for the decimal separator.
- private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
- private static final String BIT = "BIT";
- private static final String BOOLEAN = "BOOLEAN";
- private static final String BOOL = "BOOL";
- private static final String TINYINT = "TINYINT";
- private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
- private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED
ZEROFILL";
- private static final String SMALLINT = "SMALLINT";
- private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
- private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT
UNSIGNED ZEROFILL";
- private static final String MEDIUMINT = "MEDIUMINT";
- private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
- private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT
UNSIGNED ZEROFILL";
- private static final String INT = "INT";
- private static final String INT_UNSIGNED = "INT UNSIGNED";
- private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED
ZEROFILL";
- private static final String BIGINT = "BIGINT";
- private static final String SERIAL = "SERIAL";
- private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
- private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED
ZEROFILL";
- private static final String REAL = "REAL";
- private static final String REAL_UNSIGNED = "REAL UNSIGNED";
- private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED
ZEROFILL";
- private static final String FLOAT = "FLOAT";
- private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
- private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED
ZEROFILL";
- private static final String DOUBLE = "DOUBLE";
- private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
- private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED
ZEROFILL";
- private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
- private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION
UNSIGNED";
- private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
- "DOUBLE PRECISION UNSIGNED ZEROFILL";
- private static final String NUMERIC = "NUMERIC";
- private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
- private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED
ZEROFILL";
- private static final String FIXED = "FIXED";
- private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
- private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED
ZEROFILL";
- private static final String DECIMAL = "DECIMAL";
- private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
- private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED
ZEROFILL";
- private static final String CHAR = "CHAR";
- private static final String VARCHAR = "VARCHAR";
- private static final String TINYTEXT = "TINYTEXT";
- private static final String MEDIUMTEXT = "MEDIUMTEXT";
- private static final String TEXT = "TEXT";
- private static final String LONGTEXT = "LONGTEXT";
- private static final String DATE = "DATE";
- private static final String TIME = "TIME";
- private static final String DATETIME = "DATETIME";
- private static final String TIMESTAMP = "TIMESTAMP";
- private static final String YEAR = "YEAR";
- private static final String BINARY = "BINARY";
- private static final String VARBINARY = "VARBINARY";
- private static final String TINYBLOB = "TINYBLOB";
- private static final String MEDIUMBLOB = "MEDIUMBLOB";
- private static final String BLOB = "BLOB";
- private static final String LONGBLOB = "LONGBLOB";
- private static final String JSON = "JSON";
- private static final String ENUM = "ENUM";
- private static final String SET = "SET";
-
- public static String toDorisType(String type, Integer length, Integer
scale) {
- switch (type.toUpperCase()) {
- case BIT:
- case BOOLEAN:
- case BOOL:
- return DorisType.BOOLEAN;
- case TINYINT:
- return DorisType.TINYINT;
- case TINYINT_UNSIGNED:
- case TINYINT_UNSIGNED_ZEROFILL:
- case SMALLINT:
- return DorisType.SMALLINT;
- case SMALLINT_UNSIGNED:
- case SMALLINT_UNSIGNED_ZEROFILL:
- case INT:
- case MEDIUMINT:
- case YEAR:
- return DorisType.INT;
- case INT_UNSIGNED:
- case INT_UNSIGNED_ZEROFILL:
- case MEDIUMINT_UNSIGNED:
- case MEDIUMINT_UNSIGNED_ZEROFILL:
- case BIGINT:
- return DorisType.BIGINT;
- case BIGINT_UNSIGNED:
- case BIGINT_UNSIGNED_ZEROFILL:
- return DorisType.LARGEINT;
- case FLOAT:
- case FLOAT_UNSIGNED:
- case FLOAT_UNSIGNED_ZEROFILL:
- return DorisType.FLOAT;
- case REAL:
- case REAL_UNSIGNED:
- case REAL_UNSIGNED_ZEROFILL:
- case DOUBLE:
- case DOUBLE_UNSIGNED:
- case DOUBLE_UNSIGNED_ZEROFILL:
- case DOUBLE_PRECISION:
- case DOUBLE_PRECISION_UNSIGNED:
- case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
- return DorisType.DOUBLE;
- case NUMERIC:
- case NUMERIC_UNSIGNED:
- case NUMERIC_UNSIGNED_ZEROFILL:
- case FIXED:
- case FIXED_UNSIGNED:
- case FIXED_UNSIGNED_ZEROFILL:
- case DECIMAL:
- case DECIMAL_UNSIGNED:
- case DECIMAL_UNSIGNED_ZEROFILL:
- return length != null && length <= 38
- ? String.format(
- "%s(%s,%s)",
- DorisType.DECIMAL_V3,
- length,
- scale != null && scale >= 0 ? scale : 0)
- : DorisType.STRING;
- case DATE:
- return DorisType.DATE_V2;
- case DATETIME:
- case TIMESTAMP:
- // default precision is 0
- // see
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
- if (length == null
- || length <= 0
- || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
- return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
- } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
- // Timestamp with a fraction of seconds.
- // For example, 2024-01-01 01:01:01.1
- // The decimal point will occupy 1 character.
- // Thus,the length of the timestamp is 21.
- return String.format(
- "%s(%s)",
- DorisType.DATETIME_V2,
- Math.min(
- length -
ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
- MAX_SUPPORTED_DATE_TIME_PRECISION));
- } else if (length <= TIMESTAMP_TYPE_MAX_PRECISION) {
- // For Debezium JSON data, the timestamp/datetime length
ranges from 0 to 9.
- return String.format(
- "%s(%s)",
- DorisType.DATETIME_V2,
- Math.min(length,
MAX_SUPPORTED_DATE_TIME_PRECISION));
- } else {
- throw new UnsupportedOperationException(
- "Unsupported length: "
- + length
- + " for MySQL TIMESTAMP/DATETIME types");
- }
- case CHAR:
- case VARCHAR:
- Preconditions.checkNotNull(length);
- return length * 3 > 65533
- ? DorisType.STRING
- : String.format("%s(%s)", DorisType.VARCHAR, length *
3);
- case TINYTEXT:
- case TEXT:
- case MEDIUMTEXT:
- case LONGTEXT:
- case ENUM:
- case TIME:
- case TINYBLOB:
- case BLOB:
- case MEDIUMBLOB:
- case LONGBLOB:
- case BINARY:
- case VARBINARY:
- case SET:
- return DorisType.STRING;
- case JSON:
- return DorisType.JSONB;
- default:
- throw new UnsupportedOperationException("Unsupported MySQL
Type: " + type);
- }
- }
-}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 29810e4..1022344 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -23,7 +23,6 @@ import com.codahale.metrics.MetricRegistry;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
import org.apache.doris.kafka.connector.DorisSinkTask;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.connection.ConnectionProvider;
@@ -35,7 +34,6 @@ import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
import org.apache.doris.kafka.connector.writer.DorisWriter;
import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
import org.apache.doris.kafka.connector.writer.load.LoadModel;
-import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -88,23 +86,15 @@ public class DorisDefaultSinkService implements
DorisSinkService {
if (writer.containsKey(nameIndex)) {
LOG.info("already start task");
} else {
- DorisWriter dorisWriter;
String topic = topicPartition.topic();
int partition = topicPartition.partition();
- String schemaChangeTopic = dorisOptions.getSchemaTopic();
- if (Objects.nonNull(schemaChangeTopic) &&
schemaChangeTopic.equals(topic)) {
- dorisWriter =
- new DebeziumSchemaChange(
- topic, partition, dorisOptions, conn,
connectMonitor);
- } else {
- LoadModel loadModel = dorisOptions.getLoadModel();
- dorisWriter =
- LoadModel.COPY_INTO.equals(loadModel)
- ? new CopyIntoWriter(
- topic, partition, dorisOptions, conn,
connectMonitor)
- : new StreamLoadWriter(
- topic, partition, dorisOptions, conn,
connectMonitor);
- }
+ LoadModel loadModel = dorisOptions.getLoadModel();
+ DorisWriter dorisWriter =
+ LoadModel.COPY_INTO.equals(loadModel)
+ ? new CopyIntoWriter(
+ topic, partition, dorisOptions, conn,
connectMonitor)
+ : new StreamLoadWriter(
+ topic, partition, dorisOptions, conn,
connectMonitor);
writer.put(nameIndex, dorisWriter);
metricsJmxReporter.start();
}
@@ -129,7 +119,7 @@ public class DorisDefaultSinkService implements
DorisSinkService {
// check all sink writer to see if they need to be flushed
for (DorisWriter writer : writer.values()) {
// Time based flushing
- if (!(writer instanceof DebeziumSchemaChange) &&
writer.shouldFlush()) {
+ if (writer.shouldFlush()) {
writer.flushBuffer();
}
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 84d3f90..2356f7d 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -19,13 +19,9 @@
package org.apache.doris.kafka.connector.utils;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.exception.ArgumentsException;
import org.apache.doris.kafka.connector.exception.DorisException;
@@ -77,31 +73,6 @@ public class ConfigCheckUtils {
configIsValid = false;
}
- String schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC);
- if (StringUtils.isNotEmpty(schemaTopic)) {
- schemaTopic = schemaTopic.trim();
- if (!topics.isEmpty()) {
- List<String> topicList =
-
Arrays.stream(topics.split(",")).collect(Collectors.toList());
- if (!topicList.contains(schemaTopic)) {
- LOG.error(
- "schema.topic is not included in topics list,
please add! "
- + " schema.topic={}, topics={}",
- schemaTopic,
- topics);
- configIsValid = false;
- }
- }
- if (!topicsRegex.isEmpty() && !topicsRegex.equals(schemaTopic)) {
- LOG.error(
- "topics.regex must equals schema.topic. please check
again!"
- + " topics.regex={}, schema.topic={}",
- topicsRegex,
- schemaTopic);
- configIsValid = false;
- }
- }
-
if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
&&
parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
== null) {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
deleted file mode 100644
index 1eeab0e..0000000
---
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.writer.schema;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.debezium.data.Envelope;
-import io.debezium.util.Strings;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.doris.kafka.connector.cfg.DorisOptions;
-import org.apache.doris.kafka.connector.connection.ConnectionProvider;
-import org.apache.doris.kafka.connector.converter.RecordDescriptor;
-import org.apache.doris.kafka.connector.exception.SchemaChangeException;
-import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
-import org.apache.doris.kafka.connector.model.ColumnDescriptor;
-import org.apache.doris.kafka.connector.model.TableDescriptor;
-import org.apache.doris.kafka.connector.model.doris.Schema;
-import org.apache.doris.kafka.connector.service.DorisSystemService;
-import org.apache.doris.kafka.connector.service.RestService;
-import org.apache.doris.kafka.connector.writer.DorisWriter;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DebeziumSchemaChange extends DorisWriter {
- private static final Logger LOG =
LoggerFactory.getLogger(DebeziumSchemaChange.class);
- public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
- public static final String TABLE_CHANGES = "tableChanges";
- public static final String TABLE_CHANGES_TYPE = "type";
- private final Map<String, String> topic2TableMap;
- private SchemaChangeManager schemaChangeManager;
- private DorisSystemService dorisSystemService;
- private Set<String> sinkTableSet;
- private List<String> ddlSqlList;
-
- public DebeziumSchemaChange(
- String topic,
- int partition,
- DorisOptions dorisOptions,
- ConnectionProvider connectionProvider,
- DorisConnectMonitor connectMonitor) {
- super(topic, partition, dorisOptions, connectionProvider,
connectMonitor);
- this.schemaChange = true;
- this.sinkTableSet = new HashSet<>();
- this.dorisSystemService = new DorisSystemService(dorisOptions);
- this.topic2TableMap = dorisOptions.getTopicMap();
- this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
- init();
- }
-
- @Override
- public void fetchOffset() {
- // do nothing
- }
-
- private void init() {
- Set<Map.Entry<String, String>> entrySet = topic2TableMap.entrySet();
- for (Map.Entry<String, String> entry : entrySet) {
- sinkTableSet.add(entry.getValue());
- }
- }
-
- @Override
- public void insert(SinkRecord record) {
- if (!validate(record)) {
- processedOffset.set(record.kafkaOffset());
- return;
- }
- schemaChange(record);
- }
-
- private boolean validate(final SinkRecord record) {
- if (!isSchemaChange(record)) {
- LOG.warn(
- "Current topic={}, the message does not contain schema
change change information, please check schema.topic",
- dorisOptions.getSchemaTopic());
- throw new SchemaChangeException(
- "The message does not contain schema change change
information, please check schema.topic");
- }
-
- tableName = resolveTableName(record);
- if (tableName == null) {
- LOG.warn(
- "Ignored to write record from topic '{}' partition '{}'
offset '{}'. No resolvable table name",
- record.topic(),
- record.kafkaPartition(),
- record.kafkaOffset());
- return false;
- }
-
- if (!sinkTableSet.contains(tableName)) {
- LOG.warn(
- "The "
- + tableName
- + " is not defined and requires synchronized data.
If you need to synchronize the table data, please configure it in
'doris.topic2table.map'");
- return false;
- }
-
- Struct recordStruct = (Struct) (record.value());
- if (isTruncate(recordStruct)) {
- LOG.warn("Truncate {} table is not supported", tableName);
- return false;
- }
-
- List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
- Struct tableChange = (Struct) tableChanges.get(0);
- if ("DROP".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))
- ||
"CREATE".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))) {
- LOG.warn(
- "CREATE and DROP {} tables are currently not supported.
Please create or drop them manually.",
- tableName);
- return false;
- }
- return true;
- }
-
- @Override
- public void commit(int partition) {
- // do nothing
- }
-
- private void schemaChange(final SinkRecord record) {
- Struct recordStruct = (Struct) (record.value());
- List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
- Struct tableChange = (Struct) tableChanges.get(0);
- RecordDescriptor recordDescriptor =
- RecordDescriptor.builder()
- .withSinkRecord(record)
- .withTableChange(tableChange)
- .build();
- tableChange(tableName, recordDescriptor);
- }
-
- private boolean isTruncate(final Struct record) {
- // Generally the truncate corresponding tableChanges is empty
- return record.getArray(TABLE_CHANGES).isEmpty();
- }
-
- private static boolean isSchemaChange(SinkRecord record) {
- return record.valueSchema() != null
- && !Strings.isNullOrEmpty(record.valueSchema().name())
- && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
- }
-
- private String resolveTableName(SinkRecord record) {
- if (isTombstone(record)) {
- LOG.warn(
- "Ignore this record because it seems to be a tombstone
that doesn't have source field, then cannot resolve table name in topic '{}',
partition '{}', offset '{}'",
- record.topic(),
- record.kafkaPartition(),
- record.kafkaOffset());
- return null;
- }
- Struct source = ((Struct)
record.value()).getStruct(Envelope.FieldName.SOURCE);
- return source.getString("table");
- }
-
- private void alterTableIfNeeded(String tableName, RecordDescriptor record)
{
- LOG.debug("Attempting to alter table '{}'.", tableName);
- if (!hasTable(tableName)) {
- LOG.error("Table '{}' does not exist and cannot be altered.",
tableName);
- throw new SchemaChangeException("Could not find table: " +
tableName);
- }
- final TableDescriptor dorisTableDescriptor =
obtainTableSchema(tableName);
- SchemaChangeHelper.compareSchema(dorisTableDescriptor,
record.getFields());
- ddlSqlList =
SchemaChangeHelper.generateDDLSql(dorisOptions.getDatabase(), tableName);
- doSchemaChange(dorisOptions.getDatabase(), tableName);
- }
-
- /** Obtain table schema from doris. */
- private TableDescriptor obtainTableSchema(String tableName) {
- Schema schema = RestService.getSchema(dorisOptions, dbName, tableName,
LOG);
- List<ColumnDescriptor> columnDescriptors = new ArrayList<>();
- schema.getProperties()
- .forEach(
- column -> {
- ColumnDescriptor columnDescriptor =
- ColumnDescriptor.builder()
- .columnName(column.getName())
- .typeName(column.getType())
- .comment(column.getComment())
- .build();
- columnDescriptors.add(columnDescriptor);
- });
- return TableDescriptor.builder()
- .tableName(tableName)
- .type(schema.getKeysType())
- .columns(columnDescriptors)
- .build();
- }
-
- private boolean hasTable(String tableName) {
- return dorisSystemService.tableExists(dbName, tableName);
- }
-
- private void tableChange(String tableName, RecordDescriptor
recordDescriptor) {
- if (!hasTable(tableName)) {
- // TODO Table does not exist, automatically created it.
- LOG.error("{} Table does not exist, please create manually.",
tableName);
- } else {
- // Table exists, lets attempt to alter it if necessary.
- alterTableIfNeeded(tableName, recordDescriptor);
- }
- processedOffset.set(recordDescriptor.getOffset());
- }
-
- private boolean doSchemaChange(String database, String tableName) {
- boolean status = false;
- if (ddlSqlList.isEmpty()) {
- LOG.info("Schema change ddl is empty, not need do schema change.");
- return false;
- }
- try {
- List<SchemaChangeHelper.DDLSchema> ddlSchemas =
SchemaChangeHelper.getDdlSchemas();
- for (int i = 0; i < ddlSqlList.size(); i++) {
- SchemaChangeHelper.DDLSchema ddlSchema = ddlSchemas.get(i);
- String ddlSql = ddlSqlList.get(i);
- boolean doSchemaChange = checkSchemaChange(database,
tableName, ddlSchema);
- status =
- doSchemaChange
- && schemaChangeManager.execute(ddlSql,
dorisOptions.getDatabase());
- LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
- }
- } catch (Exception e) {
- LOG.warn("schema change error :", e);
- }
- return status;
- }
-
- private boolean checkSchemaChange(
- String database, String table, SchemaChangeHelper.DDLSchema
ddlSchema)
- throws IllegalArgumentException, IOException {
- Map<String, Object> param =
- SchemaChangeManager.buildRequestParam(
- ddlSchema.isDropColumn(), ddlSchema.getColumnName());
- return schemaChangeManager.checkSchemaChange(database, table, param);
- }
-
- public long getOffset() {
- committedOffset.set(processedOffset.get());
- return committedOffset.get() + 1;
- }
-
- private boolean isTombstone(SinkRecord record) {
- return record.value() == null;
- }
-
- @VisibleForTesting
- public void setSinkTableSet(Set<String> sinkTableSet) {
- this.sinkTableSet = sinkTableSet;
- }
-
- @VisibleForTesting
- public void setDorisSystemService(DorisSystemService dorisSystemService) {
- this.dorisSystemService = dorisSystemService;
- }
-
- @VisibleForTesting
- public List<String> getDdlSqlList() {
- return ddlSqlList;
- }
-
- @VisibleForTesting
- public void setSchemaChangeManager(SchemaChangeManager
schemaChangeManager) {
- this.schemaChangeManager = schemaChangeManager;
- }
-}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
deleted file mode 100644
index abf424c..0000000
---
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.writer.schema;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.doris.kafka.connector.converter.RecordDescriptor;
-import org.apache.doris.kafka.connector.model.ColumnDescriptor;
-import org.apache.doris.kafka.connector.model.TableDescriptor;
-
-public class SchemaChangeHelper {
- private static final List<ColumnDescriptor> addColumnDescriptors =
Lists.newArrayList();
- // Used to determine whether the column in the doris table can undergo
schema change
- private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
- private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
-
- // TODO support drop column
- // Dropping a column is a dangerous behavior and may result in an
accidental deletion.
- // There are some problems in the current implementation: each alter
column operation will read
- // the table structure
- // in doris and compare the schema with the topic message.
- // When there are more columns in the doris table than in the upstream
table,
- // these redundant columns in doris will be dropped, regardless of these
redundant columns, is
- // what you need.
- // Therefore, the operation of dropping a column behavior currently
requires the user to do it
- // himself.
- private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
-
- /**
- * Compare kafka upstream table structure with doris table structure. If
kafka field does not
- * contain the structure of dorisTable, then need to add this field.
- *
- * @param dorisTable read from the table schema of doris.
- * @param fields table structure from kafka upstream data source.
- */
- public static void compareSchema(
- TableDescriptor dorisTable, Map<String,
RecordDescriptor.FieldDescriptor> fields) {
- // Determine whether fields need to be added to doris table
- addColumnDescriptors.clear();
- Collection<ColumnDescriptor> dorisTableColumns =
dorisTable.getColumns();
- Set<String> dorisTableColumnNames =
- dorisTableColumns.stream()
- .map(ColumnDescriptor::getColumnName)
- .collect(Collectors.toSet());
- Set<Map.Entry<String, RecordDescriptor.FieldDescriptor>> fieldsEntries
= fields.entrySet();
- for (Map.Entry<String, RecordDescriptor.FieldDescriptor> fieldEntry :
fieldsEntries) {
- String fieldName = fieldEntry.getKey();
- if (!dorisTableColumnNames.contains(fieldName)) {
- RecordDescriptor.FieldDescriptor fieldDescriptor =
fieldEntry.getValue();
- ColumnDescriptor columnDescriptor =
- new ColumnDescriptor.Builder()
- .columnName(fieldDescriptor.getName())
- .typeName(fieldDescriptor.getSchemaTypeName())
-
.defaultValue(fieldDescriptor.getDefaultValue())
- .comment(fieldDescriptor.getComment())
- .build();
- addColumnDescriptors.add(columnDescriptor);
- }
- }
- }
-
- public static List<String> generateDDLSql(String database, String table) {
- ddlSchemas.clear();
- List<String> ddlList = Lists.newArrayList();
- for (ColumnDescriptor columnDescriptor : addColumnDescriptors) {
- ddlList.add(buildAddColumnDDL(database, table, columnDescriptor));
- ddlSchemas.add(new DDLSchema(columnDescriptor.getColumnName(),
false));
- }
- return ddlList;
- }
-
- public static List<DDLSchema> getDdlSchemas() {
- return ddlSchemas;
- }
-
- private static String buildDropColumnDDL(String database, String
tableName, String columName) {
- return String.format(
- DROP_DDL,
- identifier(database) + "." + identifier(tableName),
- identifier(columName));
- }
-
- private static String buildAddColumnDDL(
- String database, String tableName, ColumnDescriptor
columnDescriptor) {
- String columnName = columnDescriptor.getColumnName();
- String columnType = columnDescriptor.getTypeName();
- String defaultValue = columnDescriptor.getDefaultValue();
- String comment = columnDescriptor.getComment();
- String addDDL =
- String.format(
- ADD_DDL,
- identifier(database) + "." + identifier(tableName),
- identifier(columnName),
- columnType);
- if (defaultValue != null) {
- addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue);
- }
- if (StringUtils.isNotEmpty(comment)) {
- addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'";
- }
- return addDDL;
- }
-
- private static String identifier(String name) {
- return "`" + name + "`";
- }
-
- private static String quoteDefaultValue(String defaultValue) {
- // DEFAULT current_timestamp not need quote
- if (defaultValue.equalsIgnoreCase("current_timestamp")) {
- return defaultValue;
- }
- return "'" + defaultValue + "'";
- }
-
- private static String quoteComment(String comment) {
- return comment.replaceAll("'", "\\\\'");
- }
-
- public static class DDLSchema {
- private final String columnName;
- private final boolean isDropColumn;
-
- public DDLSchema(String columnName, boolean isDropColumn) {
- this.columnName = columnName;
- this.isDropColumn = isDropColumn;
- }
-
- public String getColumnName() {
- return columnName;
- }
-
- public boolean isDropColumn() {
- return isDropColumn;
- }
- }
-}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 3cff4fa..f80a737 100644
---
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -19,6 +19,13 @@
package org.apache.doris.kafka.connector.converter;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -30,20 +37,31 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.schema.SchemaChangeManager;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.model.doris.Schema;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
+import org.apache.doris.kafka.connector.service.RestService;
import org.apache.doris.kafka.connector.writer.TestRecordBuffer;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
public class TestRecordService {
+ private final ObjectMapper objectMapper = new ObjectMapper();
private RecordService recordService;
private Properties props = new Properties();
private JsonConverter jsonConverter = new JsonConverter();
+ private MockedStatic<RestService> mockRestService;
@Before
public void init() throws IOException {
@@ -54,9 +72,21 @@ public class TestRecordService {
props.load(stream);
props.put("task_id", "1");
props.put("converter.mode", "debezium_ingestion");
+ props.put("schema.evolution", "basic");
+ props.put(
+ "doris.topic2table.map",
+
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal");
recordService = new RecordService(new DorisOptions((Map) props));
HashMap<String, String> config = new HashMap<>();
jsonConverter.configure(config, false);
+ mockRestService = mockStatic(RestService.class);
+
+ SchemaChangeManager mockSchemaChangeManager =
Mockito.mock(SchemaChangeManager.class);
+ DorisSystemService mockDorisSystemService =
mock(DorisSystemService.class);
+ doNothing().when(mockSchemaChangeManager).addColumnDDL(anyString(),
any());
+ when(mockDorisSystemService.tableExists(anyString(),
anyString())).thenReturn(true);
+ recordService.setDorisSystemService(mockDorisSystemService);
+ recordService.setSchemaChangeManager(mockSchemaChangeManager);
}
/**
@@ -70,7 +100,19 @@ public class TestRecordService {
*/
@Test
public void processMysqlDebeziumStructRecord() throws IOException {
- String topic = "normal.wdl_test.example_table";
+ String schemaStr =
+
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"BIGINT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"age\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"email\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"birth_date\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV
[...]
+ Schema schema = null;
+ try {
+ schema = objectMapper.readValue(schemaStr, Schema.class);
+ } catch (JsonProcessingException e) {
+ throw new DorisException(e);
+ }
+ mockRestService
+ .when(() -> RestService.getSchema(any(), any(), any(), any()))
+ .thenReturn(schema);
+
+ String topic = "avro_schema.wdl_test.example_table";
// no delete value
String noDeleteValue =
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
[...]
@@ -86,13 +128,48 @@ public class TestRecordService {
buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
}
+ @Test
+ public void processMysqlDebeziumStructRecordAlter() throws IOException {
+ String schemaStr =
+
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"BIGINT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"age\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"email\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"birth_date\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV
[...]
+ Schema schema = null;
+ try {
+ schema = objectMapper.readValue(schemaStr, Schema.class);
+ } catch (JsonProcessingException e) {
+ throw new DorisException(e);
+ }
+ mockRestService
+ .when(() -> RestService.getSchema(any(), any(), any(), any()))
+ .thenReturn(schema);
+
+ String topic = "avro_schema.wdl_test.example_table";
+ String topicMsg =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
[...]
+ SchemaAndValue schemaValue =
+ jsonConverter.toConnectData(topic,
topicMsg.getBytes(StandardCharsets.UTF_8));
+ SinkRecord noDeleteSinkRecord =
+ TestRecordBuffer.newSinkRecord(topic, schemaValue.value(), 8,
schemaValue.schema());
+ recordService.processStructRecord(noDeleteSinkRecord);
+
+ // Compare the results of schema change
+ Map<String, String> resultFields = new HashMap<>();
+ resultFields.put("time_column", "DATETIME(0)");
+ resultFields.put("blob_column", "STRING");
+ Set<RecordDescriptor.FieldDescriptor> missingFields =
recordService.getMissingFields();
+ for (RecordDescriptor.FieldDescriptor missingField : missingFields) {
+
Assert.assertTrue(resultFields.containsKey(missingField.getName()));
+ Assert.assertEquals(
+ resultFields.get(missingField.getName()),
missingField.getTypeName());
+ }
+ }
+
private void buildProcessStructRecord(String topic, String sourceValue,
String target)
throws IOException {
SchemaAndValue noDeleteSchemaValue =
jsonConverter.toConnectData(topic,
sourceValue.getBytes(StandardCharsets.UTF_8));
SinkRecord noDeleteSinkRecord =
TestRecordBuffer.newSinkRecord(
- noDeleteSchemaValue.value(), 8,
noDeleteSchemaValue.schema());
+ topic, noDeleteSchemaValue.value(), 8,
noDeleteSchemaValue.schema());
String processResult =
recordService.processStructRecord(noDeleteSinkRecord);
Assert.assertEquals(target, processResult);
}
@@ -113,6 +190,18 @@ public class TestRecordService {
@Test
public void processStructRecordWithDebeziumSchema() throws IOException {
+ String schemaStr =
+
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"}],\"status\":200}";
+ Schema schema = null;
+ try {
+ schema = objectMapper.readValue(schemaStr, Schema.class);
+ } catch (JsonProcessingException e) {
+ throw new DorisException(e);
+ }
+ mockRestService
+ .when(() -> RestService.getSchema(any(), any(), any(), any()))
+ .thenReturn(schema);
+
String topic = "normal.wdl_test.test_sink_normal";
// no delete value
@@ -171,4 +260,10 @@ public class TestRecordService {
String s = recordService.processStringRecord(record);
Assert.assertEquals("doris", s);
}
+
+ @After
+ public void close() {
+ mockRestService.close();
+ ;
+ }
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
deleted file mode 100644
index c95af44..0000000
---
a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.writer;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.when;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.doris.kafka.connector.cfg.DorisOptions;
-import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
-import org.apache.doris.kafka.connector.exception.DorisException;
-import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
-import org.apache.doris.kafka.connector.model.doris.Schema;
-import org.apache.doris.kafka.connector.service.DorisSystemService;
-import org.apache.doris.kafka.connector.service.RestService;
-import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange;
-import org.apache.doris.kafka.connector.writer.schema.SchemaChangeManager;
-import org.apache.kafka.connect.data.SchemaAndValue;
-import org.apache.kafka.connect.json.JsonConverter;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
-
-public class TestDebeziumSchemaChange {
- private final ObjectMapper objectMapper = new ObjectMapper();
- private final JsonConverter jsonConverter = new JsonConverter();
- private final HashSet<String> sinkTableSet = new HashSet<>();
- private DebeziumSchemaChange debeziumSchemaChange;
- private DorisOptions dorisOptions;
- private String topic;
- private MockedStatic<RestService> mockRestService;
-
- @Before
- public void init() throws IOException {
- InputStream stream =
- this.getClass()
- .getClassLoader()
-
.getResourceAsStream("doris-connector-sink.properties");
- Properties props = new Properties();
- props.load(stream);
- props.put("task_id", "1");
- props.put("name", "sink-connector-test");
- topic = "normal";
- dorisOptions = new DorisOptions((Map) props);
- DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
- DorisSystemService mockDorisSystemService =
mock(DorisSystemService.class);
- jsonConverter.configure(new HashMap<>(), false);
- mockRestService = mockStatic(RestService.class);
- SchemaChangeManager mockSchemaChangeManager =
Mockito.mock(SchemaChangeManager.class);
- Mockito.when(
- mockSchemaChangeManager.checkSchemaChange(
- Mockito.any(), Mockito.any(), Mockito.any()))
- .thenReturn(true);
- debeziumSchemaChange =
- new DebeziumSchemaChange(
- topic,
- 0,
- dorisOptions,
- new JdbcConnectionProvider(dorisOptions),
- dorisConnectMonitor);
- when(mockDorisSystemService.tableExists(anyString(),
anyString())).thenReturn(true);
-
- sinkTableSet.add("normal_time");
- debeziumSchemaChange.setSchemaChangeManager(mockSchemaChangeManager);
- debeziumSchemaChange.setSinkTableSet(sinkTableSet);
- debeziumSchemaChange.setDorisSystemService(mockDorisSystemService);
- }
-
- @Test
- public void testAlterSchemaChange() {
- String alterTopicMsg =
-
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},
[...]
- SchemaAndValue schemaAndValue =
- jsonConverter.toConnectData(topic,
alterTopicMsg.getBytes(StandardCharsets.UTF_8));
- SinkRecord sinkRecord =
- TestRecordBuffer.newSinkRecord(schemaAndValue.value(), 8,
schemaAndValue.schema());
- String normalTimeSchemaStr =
-
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"timestamp_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"datetime_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"date_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV2\"}],\"status\":200}";
- Schema normalTimeSchema = null;
- try {
- normalTimeSchema = objectMapper.readValue(normalTimeSchemaStr,
Schema.class);
- } catch (JsonProcessingException e) {
- throw new DorisException(e);
- }
- mockRestService
- .when(() -> RestService.getSchema(any(), any(), any(), any()))
- .thenReturn(normalTimeSchema);
-
- debeziumSchemaChange.insert(sinkRecord);
- List<String> ddlSqlList = debeziumSchemaChange.getDdlSqlList();
- Assert.assertEquals(
- ddlSqlList.get(0),
- "ALTER TABLE `test_db`.`normal_time` ADD COLUMN `time_test`
STRING DEFAULT '12:00'");
- }
-
- @After
- public void close() {
- mockRestService.close();
- }
-}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java
b/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java
index 3dd2292..de2e654 100644
---
a/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java
+++
b/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java
@@ -52,16 +52,11 @@ public class TestRecordBuffer {
return record;
}
- public static SinkRecord newSinkRecord(Object value, long offset, Schema
valueSchema) {
+ public static SinkRecord newSinkRecord(
+ String topic, Object value, long offset, Schema valueSchema) {
SinkRecord record =
new SinkRecord(
- "topic",
- 0,
- Schema.OPTIONAL_STRING_SCHEMA,
- "key",
- valueSchema,
- value,
- offset);
+ topic, 0, Schema.OPTIONAL_STRING_SCHEMA, "key",
valueSchema, value, offset);
return record;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]