This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e36b3ff [improve] Support writing data from one topic to multiple
tables
e36b3ff is described below
commit e36b3ff7753ad215afb3aec75c08a06486c70fe8
Author: wangchuang <[email protected]>
AuthorDate: Mon Dec 30 11:20:35 2024 +0800
[improve] Support writing data from one topic to multiple tables
---
.../doris/kafka/connector/cfg/DorisOptions.java | 12 +++
.../connector/cfg/DorisSinkConnectorConfig.java | 11 +++
.../connector/service/DorisDefaultSinkService.java | 100 +++++++++++++++++----
.../kafka/connector/writer/CopyIntoWriter.java | 5 +-
.../doris/kafka/connector/writer/DorisWriter.java | 9 +-
.../kafka/connector/writer/StreamLoadWriter.java | 8 +-
.../connector/writer/load/DorisStreamLoad.java | 5 +-
.../e2e/sink/stringconverter/StringMsgE2ETest.java | 36 ++++++++
.../connector/service/TestDorisSinkService.java | 85 ++++++++++++++++++
.../kafka/connector/writer/TestCopyIntoWriter.java | 3 +
.../connector/writer/TestStreamLoadWriter.java | 3 +
.../e2e/string_converter/table_field_config.json | 24 +++++
.../e2e/string_converter/table_field_config1.sql | 12 +++
.../e2e/string_converter/table_field_config2.sql | 12 +++
14 files changed, 295 insertions(+), 30 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index c041ed8..9afd9cb 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -44,6 +44,7 @@ public class DorisOptions {
private final String password;
private final String database;
private final Map<String, String> topicMap;
+ private final String tableNameField;
private final int fileSize;
private final int recordNum;
private final long flushTime;
@@ -91,6 +92,7 @@ public class DorisOptions {
this.topicMap =
ConfigCheckUtils.parseTopicToTableMap(
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
+ this.tableNameField =
config.get(DorisSinkConnectorConfig.RECORD_TABLE_NAME_FIELD);
if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
if
(Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) {
@@ -190,9 +192,19 @@ public class DorisOptions {
}
public String getTopicMapTable(String topic) {
+ if (topicMap.get(topic) == null) {
+ LOG.warn(
+ "The config 'doris.topic2table.map' is not set, use the
topic [{}] as table",
+ topic);
+ return topic;
+ }
return topicMap.get(topic);
}
+ public String getTableNameField() {
+ return tableNameField;
+ }
+
public boolean enable2PC() {
return enable2PC;
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index e154db5..0dd7315 100644
---
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -52,6 +52,7 @@ public class DorisSinkConnectorConfig {
public static final long BUFFER_SIZE_BYTES_DEFAULT = 5000000;
public static final long BUFFER_SIZE_BYTES_MIN = 1;
public static final String TOPICS_TABLES_MAP = "doris.topic2table.map";
+ public static final String RECORD_TABLE_NAME_FIELD =
"record.tablename.field";
public static final String LABEL_PREFIX = "label.prefix";
// Time in seconds
@@ -242,6 +243,16 @@ public class DorisSinkConnectorConfig {
3,
ConfigDef.Width.NONE,
BUFFER_FLUSH_TIME_SEC)
+ .define(
+ RECORD_TABLE_NAME_FIELD,
+ Type.STRING,
+ null,
+ Importance.LOW,
+ "The field name of record, and use this field value as
the table name to be written",
+ CONNECTOR_CONFIG,
+ 4,
+ ConfigDef.Width.NONE,
+ RECORD_TABLE_NAME_FIELD)
.define(
JMX_OPT,
ConfigDef.Type.BOOLEAN,
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 9d3a2ff..7870b9d 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -20,9 +20,13 @@
package org.apache.doris.kafka.connector.service;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.kafka.connector.DorisSinkTask;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.connection.ConnectionProvider;
@@ -72,7 +76,7 @@ public class DorisDefaultSinkService implements
DorisSinkService {
@Override
public void startTask(TopicPartition topicPartition) {
- startTask(null, topicPartition);
+ startTask(dorisOptions.getTopicMapTable(topicPartition.topic()),
topicPartition);
}
/**
@@ -83,9 +87,10 @@ public class DorisDefaultSinkService implements
DorisSinkService {
*/
@Override
public void startTask(final String tableName, final TopicPartition
topicPartition) {
- // fetch topic partition
- String nameIndex = getNameIndex(topicPartition.topic(),
topicPartition.partition());
- if (writer.containsKey(nameIndex)) {
+ // check if the task is already started
+ String writerKey =
+ getWriterKey(topicPartition.topic(),
topicPartition.partition(), tableName);
+ if (writer.containsKey(writerKey)) {
LOG.info("already start task");
} else {
String topic = topicPartition.topic();
@@ -94,10 +99,15 @@ public class DorisDefaultSinkService implements
DorisSinkService {
DorisWriter dorisWriter =
LoadModel.COPY_INTO.equals(loadModel)
? new CopyIntoWriter(
- topic, partition, dorisOptions, conn,
connectMonitor)
+ tableName, topic, partition, dorisOptions,
conn, connectMonitor)
: new StreamLoadWriter(
- topic, partition, dorisOptions, conn,
connectMonitor);
- writer.put(nameIndex, dorisWriter);
+ tableName,
+ topic,
+ partition,
+ dorisOptions,
+ conn,
+ connectMonitor);
+ writer.put(writerKey, dorisWriter);
metricsJmxReporter.start();
}
}
@@ -129,26 +139,33 @@ public class DorisDefaultSinkService implements
DorisSinkService {
@Override
public void insert(SinkRecord record) {
- String nameIndex = getNameIndex(record.topic(),
record.kafkaPartition());
+ String tableName = getSinkDorisTableName(record);
+ String writerKey = getWriterKey(record.topic(),
record.kafkaPartition(), tableName);
// init a new topic partition
- if (!writer.containsKey(nameIndex)) {
- startTask(new TopicPartition(record.topic(),
record.kafkaPartition()));
+ if (!writer.containsKey(writerKey)) {
+ startTask(tableName, new TopicPartition(record.topic(),
record.kafkaPartition()));
}
- writer.get(nameIndex).insert(record);
+ writer.get(writerKey).insert(record);
}
@Override
public long getOffset(final TopicPartition topicPartition) {
- String name = getNameIndex(topicPartition.topic(),
topicPartition.partition());
- if (writer.containsKey(name)) {
- return writer.get(name).getOffset();
- } else {
+ String tpName = getNameIndex(topicPartition.topic(),
topicPartition.partition());
+ // get all writers for the topic partition
+ List<DorisWriter> writers =
+ writer.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(tpName))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ if (writers.isEmpty()) {
LOG.info(
"Topic: {} Partition: {} hasn't been initialized to get
offset",
topicPartition.topic(),
topicPartition.partition());
return 0;
}
+ // return the max offset of all writers
+ return
writers.stream().map(DorisWriter::getOffset).reduce(Long::max).orElse(0L);
}
@Override
@@ -161,12 +178,61 @@ public class DorisDefaultSinkService implements
DorisSinkService {
offsets.keySet()
.forEach(
tp -> {
- String name = getNameIndex(tp.topic(),
tp.partition());
- writer.get(name).commit(tp.partition());
+ String tpName = getNameIndex(tp.topic(),
tp.partition());
+ // commit all writers that match the topic and
partition
+ for (Map.Entry<String, DorisWriter> entry :
writer.entrySet()) {
+ if (entry.getKey().startsWith(tpName)) {
+ entry.getValue().commit(tp.partition());
+ }
+ }
});
}
+ /**
+ * Get the table name in doris for the given record.
+ *
+ * @param record sink record
+ * @return table name in doris
+ */
+ @VisibleForTesting
+ public String getSinkDorisTableName(SinkRecord record) {
+ String defaultTableName =
dorisOptions.getTopicMapTable(record.topic());
+ String field = dorisOptions.getTableNameField();
+ // if the field is not set, use the table name in the config
+ if (StringUtils.isEmpty(field)) {
+ return defaultTableName;
+ }
+ if (!(record.value() instanceof Map)) {
+ LOG.warn(
+ "Only Map objects supported for The
'record.tablename.field' configuration, field={}, record type={}",
+ field,
+ record.value().getClass().getName());
+ return defaultTableName;
+ }
+ Map<String, Object> map = (Map<String, Object>) record.value();
+ // if the field is not found in the record, use the table name in the
config
+ if (map.get(field) == null) {
+ return defaultTableName;
+ }
+ return map.get(field).toString();
+ }
+
private static String getNameIndex(String topic, int partition) {
return topic + "_" + partition;
}
+
+ /**
+ * Parse the writer unique key
+ *
+ * @param topic topic name
+ * @param partition partition number
+ * @param tableName table name
+ * @return writer key
+ */
+ private String getWriterKey(String topic, int partition, String tableName)
{
+ if (dorisOptions.getTopicMapTable(topic).equals(tableName)) {
+ return topic + "_" + partition;
+ }
+ return topic + "_" + partition + "_" + tableName;
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
index 456e419..0abfefc 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
@@ -45,15 +45,16 @@ public class CopyIntoWriter extends DorisWriter {
private final String prefix;
public CopyIntoWriter(
+ String tableName,
String topic,
int partition,
DorisOptions dorisOptions,
ConnectionProvider connectionProvider,
DorisConnectMonitor connectMonitor) {
- super(topic, partition, dorisOptions, connectionProvider,
connectMonitor);
+ super(tableName, topic, partition, dorisOptions, connectionProvider,
connectMonitor);
this.taskId = dorisOptions.getTaskId();
this.prefix = FileNameUtils.filePrefix(dorisOptions.getName(), topic,
partition);
- this.copyLoad = new CopyLoad(dbName, tableName, dorisOptions);
+ this.copyLoad = new CopyLoad(dbName, this.tableName, dorisOptions);
}
public void fetchOffset() {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 2a6323d..9481a2f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -60,6 +60,7 @@ public abstract class DorisWriter {
protected final DorisConnectMonitor connectMonitor;
public DorisWriter(
+ String tableName,
String topic,
int partition,
DorisOptions dorisOptions,
@@ -67,11 +68,7 @@ public abstract class DorisWriter {
DorisConnectMonitor connectMonitor) {
this.topic = topic;
this.partition = partition;
- this.tableName = dorisOptions.getTopicMapTable(topic);
- if (StringUtils.isEmpty(tableName)) {
- // The mapping of topic and table is not defined
- this.tableName = this.topic;
- }
+ this.tableName = tableName;
if (StringUtils.isNotEmpty(dorisOptions.getDatabase())) {
this.dbName = dorisOptions.getDatabase();
} else if (tableName.contains(".")) {
@@ -83,7 +80,7 @@ public abstract class DorisWriter {
throw new ArgumentsException("Failed to get database and table
names");
}
- this.tableIdentifier = dbName + "." + tableName;
+ this.tableIdentifier = dbName + "." + this.tableName;
this.fileNames = new ArrayList<>();
this.buffer = new RecordBuffer();
this.processedOffset = new AtomicLong(-1);
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index fd8355c..0144a7f 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -56,18 +56,20 @@ public class StreamLoadWriter extends DorisWriter {
private DorisStreamLoad dorisStreamLoad;
public StreamLoadWriter(
+ String tableName,
String topic,
int partition,
DorisOptions dorisOptions,
ConnectionProvider connectionProvider,
DorisConnectMonitor connectMonitor) {
- super(topic, partition, dorisOptions, connectionProvider,
connectMonitor);
+ super(tableName, topic, partition, dorisOptions, connectionProvider,
connectMonitor);
this.taskId = dorisOptions.getTaskId();
this.labelGenerator = new LabelGenerator(topic, partition,
tableIdentifier);
BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions,
LOG);
this.dorisCommitter = new DorisCommitter(dorisOptions, backendUtils);
- this.dorisStreamLoad = new DorisStreamLoad(backendUtils, dorisOptions,
topic);
- checkDorisTableKey(tableName);
+ this.dorisStreamLoad =
+ new DorisStreamLoad(backendUtils, dorisOptions, topic,
this.tableName);
+ checkDorisTableKey(this.tableName);
}
/** The uniq model has 2pc close by default unless 2pc is forced open. */
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
index 6d3f770..9ab4b8d 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
@@ -55,9 +55,10 @@ public class DorisStreamLoad extends DataLoad {
private Queue<KafkaRespContent> respContents = new LinkedList<>();
private final boolean enableGroupCommit;
- public DorisStreamLoad(BackendUtils backendUtils, DorisOptions
dorisOptions, String topic) {
+ public DorisStreamLoad(
+ BackendUtils backendUtils, DorisOptions dorisOptions, String
topic, String table) {
this.database = dorisOptions.getDatabase();
- this.table = dorisOptions.getTopicMapTable(topic);
+ this.table = table;
this.user = dorisOptions.getUser();
this.password = dorisOptions.getPassword();
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database,
table);
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index f940cfe..294cb33 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -212,6 +212,42 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
checkResult(expected, query, 7);
}
+ @Test
+ public void testTableFieldConfig() throws Exception {
+
initialize("src/test/resources/e2e/string_converter/table_field_config.json");
+ String topic = "table_field_config_test";
+ String msg1 =
+
"{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\",\"table_name\":\"field_config_tab1\"}";
+ String msg2 =
+
"{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\",\"table_name\":\"field_config_tab2\"}";
+
+ produceMsg2Kafka(topic, msg1);
+ produceMsg2Kafka(topic, msg2);
+
+ String tableSql1 =
+
loadContent("src/test/resources/e2e/string_converter/table_field_config1.sql");
+ createTable(tableSql1);
+ String tableSql2 =
+
loadContent("src/test/resources/e2e/string_converter/table_field_config2.sql");
+ createTable(tableSql2);
+
+ Thread.sleep(2000);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ List<String> expected = Collections.singletonList("1,col1,col2");
+ Thread.sleep(10000);
+ String query1 =
+ String.format(
+ "select id,col1,col2 from %s.%s order by id",
+ database, "field_config_tab1");
+ checkResult(expected, query1, 3);
+ String query2 =
+ String.format(
+ "select id,col1,col2 from %s.%s order by id",
+ database, "field_config_tab2");
+ checkResult(expected, query2, 3);
+ }
+
public void checkResult(List<String> expected, String query, int
columnSize) throws Exception {
List<String> actual = new ArrayList<>();
diff --git
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
new file mode 100644
index 0000000..b947405
--- /dev/null
+++
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDorisSinkService {
+
+ private DorisDefaultSinkService dorisDefaultSinkService;
+
+ @Before
+ public void init() throws IOException {
+ InputStream stream =
+ this.getClass()
+ .getClassLoader()
+
.getResourceAsStream("doris-connector-sink.properties");
+ Properties props = new Properties();
+ props.load(stream);
+ DorisSinkConnectorConfig.setDefaultValues((Map) props);
+ props.put("task_id", "1");
+ props.put("name", "sink-connector-test");
+ props.put("record.tablename.field", "table_name");
+ dorisDefaultSinkService = new DorisDefaultSinkService((Map) props);
+ }
+
+ @Test
+ public void getSinkDorisTableName() {
+ SinkRecord record1 =
+ new SinkRecord(
+ "topic_test",
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "key",
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "val",
+ 1);
+ Assert.assertEquals(
+ "test_kafka_tbl",
dorisDefaultSinkService.getSinkDorisTableName(record1));
+
+ Map<String, String> valueMap = new HashMap<>();
+ valueMap.put("col1", "val");
+ valueMap.put("table_name", "appoint_table");
+ SinkRecord record2 =
+ new SinkRecord(
+ "topic_test",
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "key",
+ SchemaBuilder.map(Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA)
+ .optional()
+ .build(),
+ valueMap,
+ 1);
+ Assert.assertEquals(
+ "appoint_table",
dorisDefaultSinkService.getSinkDorisTableName(record2));
+ }
+}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
index 60bcd81..d3261f7 100644
---
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
+++
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
@@ -78,6 +78,7 @@ public class TestCopyIntoWriter {
CopyIntoWriter copyIntoWriter =
spy(
new CopyIntoWriter(
+ "connect-test299",
"connect-test299",
5,
dorisOptions,
@@ -102,6 +103,7 @@ public class TestCopyIntoWriter {
DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
CopyIntoWriter copyIntoWriter =
new CopyIntoWriter(
+ "test5",
"test5",
0,
dorisOptions,
@@ -122,6 +124,7 @@ public class TestCopyIntoWriter {
DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
dorisWriter =
new CopyIntoWriter(
+ "test5",
"test5",
0,
dorisOptions,
diff --git
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
index 252d83a..18014ad 100644
---
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
+++
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
@@ -99,6 +99,7 @@ public class TestStreamLoadWriter {
StreamLoadWriter streamLoadWriter =
spy(
new StreamLoadWriter(
+ "avro-complex10",
"avro-complex10",
2,
dorisOptions,
@@ -130,6 +131,7 @@ public class TestStreamLoadWriter {
StreamLoadWriter streamLoadWriter =
new StreamLoadWriter(
+ "avro-complex10",
"avro-complex10",
0,
dorisOptions,
@@ -155,6 +157,7 @@ public class TestStreamLoadWriter {
DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
dorisWriter =
new StreamLoadWriter(
+ "avro-complex10",
"avro-complex10",
0,
dorisOptions,
diff --git a/src/test/resources/e2e/string_converter/table_field_config.json
b/src/test/resources/e2e/string_converter/table_field_config.json
new file mode 100644
index 0000000..1952e53
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/table_field_config.json
@@ -0,0 +1,24 @@
+{
+ "name":"table_field_config_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"table_field_config_test",
+ "tasks.max":"1",
+ "doris.topic2table.map": "table_field_config_test:field_config_tab1",
+ "buffer.count.records":"2",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"string_msg",
+ "record.tablename.field": "table_name",
+ "enable.2pc": "false",
+ "load.model":"stream_load",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/table_field_config1.sql
b/src/test/resources/e2e/string_converter/table_field_config1.sql
new file mode 100644
index 0000000..536af40
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/table_field_config1.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE string_msg.field_config_tab1 (
+ id INT NULL,
+ col1 VARCHAR(20) NULL,
+ col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/table_field_config2.sql
b/src/test/resources/e2e/string_converter/table_field_config2.sql
new file mode 100644
index 0000000..5873e46
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/table_field_config2.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE string_msg.field_config_tab2 (
+ id INT NULL,
+ col1 VARCHAR(20) NULL,
+ col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]