This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e36b3ff  [improve] Support writing data from one topic to multiple 
tables
e36b3ff is described below

commit e36b3ff7753ad215afb3aec75c08a06486c70fe8
Author: wangchuang <[email protected]>
AuthorDate: Mon Dec 30 11:20:35 2024 +0800

    [improve] Support writing data from one topic to multiple tables
---
 .../doris/kafka/connector/cfg/DorisOptions.java    |  12 +++
 .../connector/cfg/DorisSinkConnectorConfig.java    |  11 +++
 .../connector/service/DorisDefaultSinkService.java | 100 +++++++++++++++++----
 .../kafka/connector/writer/CopyIntoWriter.java     |   5 +-
 .../doris/kafka/connector/writer/DorisWriter.java  |   9 +-
 .../kafka/connector/writer/StreamLoadWriter.java   |   8 +-
 .../connector/writer/load/DorisStreamLoad.java     |   5 +-
 .../e2e/sink/stringconverter/StringMsgE2ETest.java |  36 ++++++++
 .../connector/service/TestDorisSinkService.java    |  85 ++++++++++++++++++
 .../kafka/connector/writer/TestCopyIntoWriter.java |   3 +
 .../connector/writer/TestStreamLoadWriter.java     |   3 +
 .../e2e/string_converter/table_field_config.json   |  24 +++++
 .../e2e/string_converter/table_field_config1.sql   |  12 +++
 .../e2e/string_converter/table_field_config2.sql   |  12 +++
 14 files changed, 295 insertions(+), 30 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index c041ed8..9afd9cb 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -44,6 +44,7 @@ public class DorisOptions {
     private final String password;
     private final String database;
     private final Map<String, String> topicMap;
+    private final String tableNameField;
     private final int fileSize;
     private final int recordNum;
     private final long flushTime;
@@ -91,6 +92,7 @@ public class DorisOptions {
         this.topicMap =
                 ConfigCheckUtils.parseTopicToTableMap(
                         
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
+        this.tableNameField = 
config.get(DorisSinkConnectorConfig.RECORD_TABLE_NAME_FIELD);
 
         if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
             if 
(Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) {
@@ -190,9 +192,19 @@ public class DorisOptions {
     }
 
     public String getTopicMapTable(String topic) {
+        if (topicMap.get(topic) == null) {
+            LOG.warn(
+                    "The config 'doris.topic2table.map' is not set, use the 
topic [{}] as table",
+                    topic);
+            return topic;
+        }
         return topicMap.get(topic);
     }
 
+    public String getTableNameField() {
+        return tableNameField;
+    }
+
     public boolean enable2PC() {
         return enable2PC;
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index e154db5..0dd7315 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -52,6 +52,7 @@ public class DorisSinkConnectorConfig {
     public static final long BUFFER_SIZE_BYTES_DEFAULT = 5000000;
     public static final long BUFFER_SIZE_BYTES_MIN = 1;
     public static final String TOPICS_TABLES_MAP = "doris.topic2table.map";
+    public static final String RECORD_TABLE_NAME_FIELD = 
"record.tablename.field";
     public static final String LABEL_PREFIX = "label.prefix";
 
     // Time in seconds
@@ -242,6 +243,16 @@ public class DorisSinkConnectorConfig {
                         3,
                         ConfigDef.Width.NONE,
                         BUFFER_FLUSH_TIME_SEC)
+                .define(
+                        RECORD_TABLE_NAME_FIELD,
+                        Type.STRING,
+                        null,
+                        Importance.LOW,
+                        "The field name of record, and use this field value as 
the table name to be written",
+                        CONNECTOR_CONFIG,
+                        4,
+                        ConfigDef.Width.NONE,
+                        RECORD_TABLE_NAME_FIELD)
                 .define(
                         JMX_OPT,
                         ConfigDef.Type.BOOLEAN,
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 9d3a2ff..7870b9d 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -20,9 +20,13 @@
 package org.apache.doris.kafka.connector.service;
 
 import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.DorisSinkTask;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.connection.ConnectionProvider;
@@ -72,7 +76,7 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
 
     @Override
     public void startTask(TopicPartition topicPartition) {
-        startTask(null, topicPartition);
+        startTask(dorisOptions.getTopicMapTable(topicPartition.topic()), 
topicPartition);
     }
 
     /**
@@ -83,9 +87,10 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
      */
     @Override
     public void startTask(final String tableName, final TopicPartition 
topicPartition) {
-        // fetch topic partition
-        String nameIndex = getNameIndex(topicPartition.topic(), 
topicPartition.partition());
-        if (writer.containsKey(nameIndex)) {
+        // check if the task is already started
+        String writerKey =
+                getWriterKey(topicPartition.topic(), 
topicPartition.partition(), tableName);
+        if (writer.containsKey(writerKey)) {
             LOG.info("already start task");
         } else {
             String topic = topicPartition.topic();
@@ -94,10 +99,15 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
             DorisWriter dorisWriter =
                     LoadModel.COPY_INTO.equals(loadModel)
                             ? new CopyIntoWriter(
-                                    topic, partition, dorisOptions, conn, 
connectMonitor)
+                                    tableName, topic, partition, dorisOptions, 
conn, connectMonitor)
                             : new StreamLoadWriter(
-                                    topic, partition, dorisOptions, conn, 
connectMonitor);
-            writer.put(nameIndex, dorisWriter);
+                                    tableName,
+                                    topic,
+                                    partition,
+                                    dorisOptions,
+                                    conn,
+                                    connectMonitor);
+            writer.put(writerKey, dorisWriter);
             metricsJmxReporter.start();
         }
     }
@@ -129,26 +139,33 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
 
     @Override
     public void insert(SinkRecord record) {
-        String nameIndex = getNameIndex(record.topic(), 
record.kafkaPartition());
+        String tableName = getSinkDorisTableName(record);
+        String writerKey = getWriterKey(record.topic(), 
record.kafkaPartition(), tableName);
         // init a new topic partition
-        if (!writer.containsKey(nameIndex)) {
-            startTask(new TopicPartition(record.topic(), 
record.kafkaPartition()));
+        if (!writer.containsKey(writerKey)) {
+            startTask(tableName, new TopicPartition(record.topic(), 
record.kafkaPartition()));
         }
-        writer.get(nameIndex).insert(record);
+        writer.get(writerKey).insert(record);
     }
 
     @Override
     public long getOffset(final TopicPartition topicPartition) {
-        String name = getNameIndex(topicPartition.topic(), 
topicPartition.partition());
-        if (writer.containsKey(name)) {
-            return writer.get(name).getOffset();
-        } else {
+        String tpName = getNameIndex(topicPartition.topic(), 
topicPartition.partition());
+        // get all writers for the topic partition
+        List<DorisWriter> writers =
+                writer.entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith(tpName))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
+        if (writers.isEmpty()) {
             LOG.info(
                     "Topic: {} Partition: {} hasn't been initialized to get 
offset",
                     topicPartition.topic(),
                     topicPartition.partition());
             return 0;
         }
+        // return the max offset of all writers
+        return 
writers.stream().map(DorisWriter::getOffset).reduce(Long::max).orElse(0L);
     }
 
     @Override
@@ -161,12 +178,61 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
         offsets.keySet()
                 .forEach(
                         tp -> {
-                            String name = getNameIndex(tp.topic(), 
tp.partition());
-                            writer.get(name).commit(tp.partition());
+                            String tpName = getNameIndex(tp.topic(), 
tp.partition());
+                            // commit all writers that match the topic and 
partition
+                            for (Map.Entry<String, DorisWriter> entry : 
writer.entrySet()) {
+                                if (entry.getKey().startsWith(tpName)) {
+                                    entry.getValue().commit(tp.partition());
+                                }
+                            }
                         });
     }
 
+    /**
+     * Get the table name in doris for the given record.
+     *
+     * @param record sink record
+     * @return table name in doris
+     */
+    @VisibleForTesting
+    public String getSinkDorisTableName(SinkRecord record) {
+        String defaultTableName = 
dorisOptions.getTopicMapTable(record.topic());
+        String field = dorisOptions.getTableNameField();
+        // if the field is not set, use the table name in the config
+        if (StringUtils.isEmpty(field)) {
+            return defaultTableName;
+        }
+        if (!(record.value() instanceof Map)) {
+            LOG.warn(
+                    "Only Map objects supported for The 
'record.tablename.field' configuration, field={}, record type={}",
+                    field,
+                    record.value().getClass().getName());
+            return defaultTableName;
+        }
+        Map<String, Object> map = (Map<String, Object>) record.value();
+        // if the field is not found in the record, use the table name in the 
config
+        if (map.get(field) == null) {
+            return defaultTableName;
+        }
+        return map.get(field).toString();
+    }
+
     private static String getNameIndex(String topic, int partition) {
         return topic + "_" + partition;
     }
+
+    /**
+     * Parse the writer unique key
+     *
+     * @param topic topic name
+     * @param partition partition number
+     * @param tableName table name
+     * @return writer key
+     */
+    private String getWriterKey(String topic, int partition, String tableName) 
{
+        if (dorisOptions.getTopicMapTable(topic).equals(tableName)) {
+            return topic + "_" + partition;
+        }
+        return topic + "_" + partition + "_" + tableName;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
index 456e419..0abfefc 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
@@ -45,15 +45,16 @@ public class CopyIntoWriter extends DorisWriter {
     private final String prefix;
 
     public CopyIntoWriter(
+            String tableName,
             String topic,
             int partition,
             DorisOptions dorisOptions,
             ConnectionProvider connectionProvider,
             DorisConnectMonitor connectMonitor) {
-        super(topic, partition, dorisOptions, connectionProvider, 
connectMonitor);
+        super(tableName, topic, partition, dorisOptions, connectionProvider, 
connectMonitor);
         this.taskId = dorisOptions.getTaskId();
         this.prefix = FileNameUtils.filePrefix(dorisOptions.getName(), topic, 
partition);
-        this.copyLoad = new CopyLoad(dbName, tableName, dorisOptions);
+        this.copyLoad = new CopyLoad(dbName, this.tableName, dorisOptions);
     }
 
     public void fetchOffset() {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 2a6323d..9481a2f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -60,6 +60,7 @@ public abstract class DorisWriter {
     protected final DorisConnectMonitor connectMonitor;
 
     public DorisWriter(
+            String tableName,
             String topic,
             int partition,
             DorisOptions dorisOptions,
@@ -67,11 +68,7 @@ public abstract class DorisWriter {
             DorisConnectMonitor connectMonitor) {
         this.topic = topic;
         this.partition = partition;
-        this.tableName = dorisOptions.getTopicMapTable(topic);
-        if (StringUtils.isEmpty(tableName)) {
-            // The mapping of topic and table is not defined
-            this.tableName = this.topic;
-        }
+        this.tableName = tableName;
         if (StringUtils.isNotEmpty(dorisOptions.getDatabase())) {
             this.dbName = dorisOptions.getDatabase();
         } else if (tableName.contains(".")) {
@@ -83,7 +80,7 @@ public abstract class DorisWriter {
             throw new ArgumentsException("Failed to get database and table 
names");
         }
 
-        this.tableIdentifier = dbName + "." + tableName;
+        this.tableIdentifier = dbName + "." + this.tableName;
         this.fileNames = new ArrayList<>();
         this.buffer = new RecordBuffer();
         this.processedOffset = new AtomicLong(-1);
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index fd8355c..0144a7f 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -56,18 +56,20 @@ public class StreamLoadWriter extends DorisWriter {
     private DorisStreamLoad dorisStreamLoad;
 
     public StreamLoadWriter(
+            String tableName,
             String topic,
             int partition,
             DorisOptions dorisOptions,
             ConnectionProvider connectionProvider,
             DorisConnectMonitor connectMonitor) {
-        super(topic, partition, dorisOptions, connectionProvider, 
connectMonitor);
+        super(tableName, topic, partition, dorisOptions, connectionProvider, 
connectMonitor);
         this.taskId = dorisOptions.getTaskId();
         this.labelGenerator = new LabelGenerator(topic, partition, 
tableIdentifier);
         BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions, 
LOG);
         this.dorisCommitter = new DorisCommitter(dorisOptions, backendUtils);
-        this.dorisStreamLoad = new DorisStreamLoad(backendUtils, dorisOptions, 
topic);
-        checkDorisTableKey(tableName);
+        this.dorisStreamLoad =
+                new DorisStreamLoad(backendUtils, dorisOptions, topic, 
this.tableName);
+        checkDorisTableKey(this.tableName);
     }
 
     /** The uniq model has 2pc close by default unless 2pc is forced open. */
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
index 6d3f770..9ab4b8d 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
@@ -55,9 +55,10 @@ public class DorisStreamLoad extends DataLoad {
     private Queue<KafkaRespContent> respContents = new LinkedList<>();
     private final boolean enableGroupCommit;
 
-    public DorisStreamLoad(BackendUtils backendUtils, DorisOptions 
dorisOptions, String topic) {
+    public DorisStreamLoad(
+            BackendUtils backendUtils, DorisOptions dorisOptions, String 
topic, String table) {
         this.database = dorisOptions.getDatabase();
-        this.table = dorisOptions.getTopicMapTable(topic);
+        this.table = table;
         this.user = dorisOptions.getUser();
         this.password = dorisOptions.getPassword();
         this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, 
table);
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index f940cfe..294cb33 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -212,6 +212,42 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expected, query, 7);
     }
 
+    @Test
+    public void testTableFieldConfig() throws Exception {
+        
initialize("src/test/resources/e2e/string_converter/table_field_config.json");
+        String topic = "table_field_config_test";
+        String msg1 =
+                
"{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\",\"table_name\":\"field_config_tab1\"}";
+        String msg2 =
+                
"{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\",\"table_name\":\"field_config_tab2\"}";
+
+        produceMsg2Kafka(topic, msg1);
+        produceMsg2Kafka(topic, msg2);
+
+        String tableSql1 =
+                
loadContent("src/test/resources/e2e/string_converter/table_field_config1.sql");
+        createTable(tableSql1);
+        String tableSql2 =
+                
loadContent("src/test/resources/e2e/string_converter/table_field_config2.sql");
+        createTable(tableSql2);
+
+        Thread.sleep(2000);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        List<String> expected = Collections.singletonList("1,col1,col2");
+        Thread.sleep(10000);
+        String query1 =
+                String.format(
+                        "select id,col1,col2 from %s.%s order by id",
+                        database, "field_config_tab1");
+        checkResult(expected, query1, 3);
+        String query2 =
+                String.format(
+                        "select id,col1,col2 from %s.%s order by id",
+                        database, "field_config_tab2");
+        checkResult(expected, query2, 3);
+    }
+
     public void checkResult(List<String> expected, String query, int 
columnSize) throws Exception {
         List<String> actual = new ArrayList<>();
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
 
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
new file mode 100644
index 0000000..b947405
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDorisSinkService {
+
+    private DorisDefaultSinkService dorisDefaultSinkService;
+
+    @Before
+    public void init() throws IOException {
+        InputStream stream =
+                this.getClass()
+                        .getClassLoader()
+                        
.getResourceAsStream("doris-connector-sink.properties");
+        Properties props = new Properties();
+        props.load(stream);
+        DorisSinkConnectorConfig.setDefaultValues((Map) props);
+        props.put("task_id", "1");
+        props.put("name", "sink-connector-test");
+        props.put("record.tablename.field", "table_name");
+        dorisDefaultSinkService = new DorisDefaultSinkService((Map) props);
+    }
+
+    @Test
+    public void getSinkDorisTableName() {
+        SinkRecord record1 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "val",
+                        1);
+        Assert.assertEquals(
+                "test_kafka_tbl", 
dorisDefaultSinkService.getSinkDorisTableName(record1));
+
+        Map<String, String> valueMap = new HashMap<>();
+        valueMap.put("col1", "val");
+        valueMap.put("table_name", "appoint_table");
+        SinkRecord record2 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.STRING_SCHEMA)
+                                .optional()
+                                .build(),
+                        valueMap,
+                        1);
+        Assert.assertEquals(
+                "appoint_table", 
dorisDefaultSinkService.getSinkDorisTableName(record2));
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
index 60bcd81..d3261f7 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
@@ -78,6 +78,7 @@ public class TestCopyIntoWriter {
         CopyIntoWriter copyIntoWriter =
                 spy(
                         new CopyIntoWriter(
+                                "connect-test299",
                                 "connect-test299",
                                 5,
                                 dorisOptions,
@@ -102,6 +103,7 @@ public class TestCopyIntoWriter {
         DorisConnectMonitor dorisConnectMonitor = 
mock(DorisConnectMonitor.class);
         CopyIntoWriter copyIntoWriter =
                 new CopyIntoWriter(
+                        "test5",
                         "test5",
                         0,
                         dorisOptions,
@@ -122,6 +124,7 @@ public class TestCopyIntoWriter {
         DorisConnectMonitor dorisConnectMonitor = 
mock(DorisConnectMonitor.class);
         dorisWriter =
                 new CopyIntoWriter(
+                        "test5",
                         "test5",
                         0,
                         dorisOptions,
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
index 252d83a..18014ad 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
@@ -99,6 +99,7 @@ public class TestStreamLoadWriter {
         StreamLoadWriter streamLoadWriter =
                 spy(
                         new StreamLoadWriter(
+                                "avro-complex10",
                                 "avro-complex10",
                                 2,
                                 dorisOptions,
@@ -130,6 +131,7 @@ public class TestStreamLoadWriter {
 
         StreamLoadWriter streamLoadWriter =
                 new StreamLoadWriter(
+                        "avro-complex10",
                         "avro-complex10",
                         0,
                         dorisOptions,
@@ -155,6 +157,7 @@ public class TestStreamLoadWriter {
         DorisConnectMonitor dorisConnectMonitor = 
mock(DorisConnectMonitor.class);
         dorisWriter =
                 new StreamLoadWriter(
+                        "avro-complex10",
                         "avro-complex10",
                         0,
                         dorisOptions,
diff --git a/src/test/resources/e2e/string_converter/table_field_config.json 
b/src/test/resources/e2e/string_converter/table_field_config.json
new file mode 100644
index 0000000..1952e53
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/table_field_config.json
@@ -0,0 +1,24 @@
+{
+  "name":"table_field_config_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"table_field_config_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": "table_field_config_test:field_config_tab1",
+    "buffer.count.records":"2",
+    "buffer.flush.time":"10",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"string_msg",
+    "record.tablename.field": "table_name",
+    "enable.2pc": "false",
+    "load.model":"stream_load",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
+    "value.converter.schemas.enable": "false"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/table_field_config1.sql 
b/src/test/resources/e2e/string_converter/table_field_config1.sql
new file mode 100644
index 0000000..536af40
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/table_field_config1.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE string_msg.field_config_tab1 (
+  id INT NULL,
+  col1 VARCHAR(20) NULL,
+  col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/table_field_config2.sql 
b/src/test/resources/e2e/string_converter/table_field_config2.sql
new file mode 100644
index 0000000..5873e46
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/table_field_config2.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE string_msg.field_config_tab2 (
+  id INT NULL,
+  col1 VARCHAR(20) NULL,
+  col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to