This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 0813cb5  [improve] support combine flush async (#74)
0813cb5 is described below

commit 0813cb51f3c9b0184d3fc1ca80de8472c1931fc7
Author: wudi <[email protected]>
AuthorDate: Mon May 26 10:48:49 2025 +0800

    [improve] support combine flush async (#74)
    
    When this is enabled, enable.combine.flush=true . Currently, streamload is 
still synchronous, which will block the consumption of kafka data while 
writing. Change it to asynchronous to improve throughput.
---
 .../doris/kafka/connector/DorisSinkTask.java       |   2 +
 .../service/DorisCombinedSinkService.java          |  28 ++-
 .../connector/service/DorisDefaultSinkService.java |   3 +
 .../kafka/connector/service/DorisSinkService.java  |   3 +
 .../connector/writer/AsyncStreamLoadWriter.java    | 139 +++++++++++
 .../doris/kafka/connector/writer/DorisWriter.java  |  14 +-
 .../doris/kafka/connector/writer/RecordBuffer.java |   9 +
 .../writer/load/AsyncDorisStreamLoad.java          | 256 +++++++++++++++++++++
 .../writer/load/DefaultThreadFactory.java          |  39 ++++
 .../e2e/sink/AbstractKafka2DorisSink.java          |  20 +-
 .../connector/e2e/sink/avro/AvroMsgE2ETest.java    |   2 +-
 .../stringconverter/DorisSinkFailoverSinkTest.java |  29 ++-
 .../e2e/sink/stringconverter/StringMsgE2ETest.java | 100 ++++----
 .../string_msg_failover_connector_uniq.json        |  25 ++
 .../string_msg_tab_failover_uniq.sql               |  12 +
 15 files changed, 602 insertions(+), 79 deletions(-)

diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index 5f97b29..9e553c5 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -109,6 +109,8 @@ public class DorisSinkTask extends SinkTask {
                         remainingRetries);
                 remainingRetries--;
                 context.timeout(options.getRetryIntervalMs());
+                // When writing asynchronously, need to restart the 
asynchronous thread
+                sink.init();
                 throw new RetriableException(ex);
             }
             throw ex;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
index 711b405..f0d48f4 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
@@ -22,8 +22,8 @@ package org.apache.doris.kafka.connector.service;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.doris.kafka.connector.writer.AsyncStreamLoadWriter;
 import org.apache.doris.kafka.connector.writer.DorisWriter;
-import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 /** Combined all partitions and write once. */
 public class DorisCombinedSinkService extends DorisDefaultSinkService {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisCombinedSinkService.class);
-
     private final Map<String, HashMap<Integer, Long>> topicPartitionOffset;
 
     DorisCombinedSinkService(Map<String, String> config, SinkTaskContext 
context) {
@@ -42,6 +41,17 @@ public class DorisCombinedSinkService extends 
DorisDefaultSinkService {
         this.topicPartitionOffset = new HashMap<>();
     }
 
+    @Override
+    public void init() {
+        for (DorisWriter wr : writer.values()) {
+            if (wr instanceof AsyncStreamLoadWriter) {
+                // When the stream load asynchronous thread down,
+                // it needs to be restarted when retrying
+                ((AsyncStreamLoadWriter) wr).start();
+            }
+        }
+    }
+
     /**
      * Create new task
      *
@@ -60,8 +70,9 @@ public class DorisCombinedSinkService extends 
DorisDefaultSinkService {
             // Only by topic
             int partition = -1;
             DorisWriter dorisWriter =
-                    new StreamLoadWriter(
+                    new AsyncStreamLoadWriter(
                             tableName, topic, partition, dorisOptions, conn, 
connectMonitor);
+
             writer.put(writerKey, dorisWriter);
             metricsJmxReporter.start();
         }
@@ -86,14 +97,6 @@ public class DorisCombinedSinkService extends 
DorisDefaultSinkService {
             // Might happen a count of record based flushing,buffer
             insert(record);
         }
-
-        // check all sink writer to see if they need to be flushed
-        for (DorisWriter writer : writer.values()) {
-            // Time based flushing
-            if (writer.shouldFlush()) {
-                writer.flushBuffer();
-            }
-        }
     }
 
     @Override
@@ -122,8 +125,9 @@ public class DorisCombinedSinkService extends 
DorisDefaultSinkService {
     public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
         // Here we force flushing the data in memory once to
         // ensure that the offsets recorded in topicPartitionOffset have been 
flushed to doris
+        LOG.info("trigger flush by commit, topic {}", 
topicPartitionOffset.keySet());
         for (DorisWriter writer : writer.values()) {
-            writer.flushBuffer();
+            writer.commitFlush();
         }
     }
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 68f0d34..6871cc1 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -87,6 +87,9 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
                         this.metricsJmxReporter);
     }
 
+    @Override
+    public void init() {}
+
     @Override
     public void startTask(TopicPartition topicPartition) {
         startTask(dorisOptions.getTopicMapTable(topicPartition.topic()), 
topicPartition);
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
index dacee81..a3d72e9 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
@@ -28,6 +28,9 @@ import org.apache.kafka.connect.sink.SinkRecord;
 /** Background service of data sink, responsible to create/drop table and 
insert/delete files */
 public interface DorisSinkService {
 
+    /** init task for writer */
+    void init();
+
     /**
      * Start the Task.
      *
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/AsyncStreamLoadWriter.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/AsyncStreamLoadWriter.java
new file mode 100644
index 0000000..7003629
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/AsyncStreamLoadWriter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.connection.ConnectionProvider;
+import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
+import org.apache.doris.kafka.connector.utils.BackendUtils;
+import org.apache.doris.kafka.connector.writer.load.AsyncDorisStreamLoad;
+import org.apache.doris.kafka.connector.writer.load.DefaultThreadFactory;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncStreamLoadWriter extends DorisWriter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncStreamLoadWriter.class);
+    private final LabelGenerator labelGenerator;
+    private AsyncDorisStreamLoad dorisStreamLoad;
+    private final transient ScheduledExecutorService scheduledExecutorService;
+
+    public AsyncStreamLoadWriter(
+            String tableName,
+            String topic,
+            int partition,
+            DorisOptions dorisOptions,
+            ConnectionProvider connectionProvider,
+            DorisConnectMonitor connectMonitor) {
+        super(tableName, topic, partition, dorisOptions, connectionProvider, 
connectMonitor);
+        this.labelGenerator = new LabelGenerator(topic, partition, 
tableIdentifier);
+        BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions, 
LOG);
+        this.dorisStreamLoad =
+                new AsyncDorisStreamLoad(backendUtils, dorisOptions, topic, 
this.tableName);
+        this.scheduledExecutorService =
+                new ScheduledThreadPoolExecutor(
+                        1, new 
DefaultThreadFactory("stream-load-flush-interval"));
+        // when uploading data in streaming mode, we need to regularly detect 
whether there are
+        // exceptions.
+        scheduledExecutorService.scheduleWithFixedDelay(
+                this::intervalFlush,
+                dorisOptions.getFlushTime(),
+                dorisOptions.getFlushTime(),
+                TimeUnit.SECONDS);
+    }
+
+    /** start async thread stream load */
+    public void start() {
+        this.dorisStreamLoad.start();
+    }
+
+    public void insert(final SinkRecord dorisRecord) {
+        checkFlushException();
+        putBuffer(dorisRecord);
+        if (buffer.getBufferSizeBytes() >= dorisOptions.getFileSize()
+                || (dorisOptions.getRecordNum() != 0
+                        && buffer.getNumOfRecords() >= 
dorisOptions.getRecordNum())) {
+            LOG.info(
+                    "trigger flush by buffer size or count, buffer size: {}, 
num of records: {}, lastoffset : {}",
+                    buffer.getBufferSizeBytes(),
+                    buffer.getNumOfRecords(),
+                    buffer.getLastOffset());
+            bufferFullFlush();
+        }
+    }
+
+    private void bufferFullFlush() {
+        doFlush(false, true);
+    }
+
+    private void intervalFlush() {
+        LOG.debug("interval flush trigger");
+        doFlush(false, false);
+    }
+
+    public void commitFlush() {
+        doFlush(true, false);
+    }
+
+    private synchronized void doFlush(boolean waitUtilDone, boolean 
bufferFull) {
+        if (waitUtilDone || bufferFull) {
+            flushBuffer(waitUtilDone);
+        } else if (dorisStreamLoad.hasCapacity()) {
+            flushBuffer(false);
+        }
+    }
+
+    public synchronized void flushBuffer(boolean waitUtilDone) {
+        if (!buffer.isEmpty()) {
+            RecordBuffer tmpBuff = buffer;
+
+            String label = 
labelGenerator.generateLabel(tmpBuff.getLastOffset());
+            dorisStreamLoad.flush(label, tmpBuff);
+            this.buffer = new RecordBuffer();
+        }
+
+        if (waitUtilDone) {
+            dorisStreamLoad.forceFlush();
+        }
+    }
+
+    private void checkFlushException() {
+        dorisStreamLoad.checkException();
+    }
+
+    @Override
+    public void commit(int partition) {
+        // Won't go here
+    }
+
+    @Override
+    public long getOffset() {
+        // Won't go here
+        return 0;
+    }
+
+    @Override
+    public void fetchOffset() {
+        // Won't go here
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 9e5cfb9..c48fe40 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -42,7 +42,7 @@ public abstract class DorisWriter {
     protected String dbName;
     protected final String tableIdentifier;
     protected List<String> fileNames;
-    private RecordBuffer buffer;
+    protected RecordBuffer buffer;
     protected final AtomicLong committedOffset; // loaded offset + 1
     protected final AtomicLong flushedOffset; // flushed offset
     protected final AtomicLong processedOffset; // processed offset
@@ -116,11 +116,8 @@ public abstract class DorisWriter {
 
     protected void insertRecord(final SinkRecord record) {
         // discard the record if the record offset is smaller or equal to 
server side offset
-        // when enable.combine.flush=true, No verification is required because 
the offsets of
-        // multiple partitions cannot be compared.
-        if (dorisOptions.isEnableCombineFlush()
-                || (record.kafkaOffset() > this.offsetPersistedInDoris.get()
-                        && record.kafkaOffset() > processedOffset.get())) {
+        if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
+                && record.kafkaOffset() > processedOffset.get()) {
             SinkRecord dorisRecord = record;
             RecordBuffer tmpBuff = null;
 
@@ -175,6 +172,11 @@ public abstract class DorisWriter {
                 >= (dorisOptions.getFlushTime() * 1000);
     }
 
+    // for combine flush
+    public void commitFlush() {
+        flushBuffer();
+    }
+
     public void flushBuffer() {
         if (buffer.isEmpty()) {
             return;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java
index 9fcbc23..990043f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java
@@ -29,6 +29,7 @@ public class RecordBuffer extends PartitionBuffer<String> {
     private static final Logger LOG = 
LoggerFactory.getLogger(RecordBuffer.class);
     public static final String LINE_SEPARATOR = "\n";
     private final StringJoiner buffer;
+    private String label;
 
     public RecordBuffer() {
         super();
@@ -52,4 +53,12 @@ public class RecordBuffer extends PartitionBuffer<String> {
                 getLastOffset());
         return result;
     }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void setLabel(String label) {
+        this.label = label;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
new file mode 100644
index 0000000..49d7ba1
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.load;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.exception.StreamLoadException;
+import org.apache.doris.kafka.connector.model.KafkaRespContent;
+import org.apache.doris.kafka.connector.utils.BackendUtils;
+import org.apache.doris.kafka.connector.utils.HttpPutBuilder;
+import org.apache.doris.kafka.connector.utils.HttpUtils;
+import org.apache.doris.kafka.connector.writer.LoadStatus;
+import org.apache.doris.kafka.connector.writer.RecordBuffer;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncDorisStreamLoad extends DataLoad {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncDorisStreamLoad.class);
+    private static final String LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
+    private static final List<String> DORIS_SUCCESS_STATUS =
+            new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, 
LoadStatus.PUBLISH_TIMEOUT));
+    private String loadUrl;
+    private final DorisOptions dorisOptions;
+    private final String topic;
+    private String hostPort;
+    private final CloseableHttpClient httpClient = new 
HttpUtils().getHttpClient();
+    private final BackendUtils backendUtils;
+    private Queue<KafkaRespContent> respContents = new LinkedList<>();
+    private final boolean enableGroupCommit;
+    private ExecutorService loadExecutorService;
+    private LoadAsyncExecutor loadAsyncExecutor;
+    private BlockingQueue<RecordBuffer> flushQueue = new 
LinkedBlockingDeque<>(1);
+    private final AtomicBoolean started;
+    private volatile boolean loadThreadAlive = false;
+    private AtomicReference<Throwable> exception = new AtomicReference<>(null);
+
+    public AsyncDorisStreamLoad(
+            BackendUtils backendUtils, DorisOptions dorisOptions, String 
topic, String table) {
+        this.database = dorisOptions.getDatabase();
+        this.table = table;
+        this.user = dorisOptions.getUser();
+        this.password = dorisOptions.getPassword();
+        this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, 
table);
+        this.dorisOptions = dorisOptions;
+        this.backendUtils = backendUtils;
+        this.topic = topic;
+        this.enableGroupCommit = dorisOptions.enableGroupCommit();
+        this.loadAsyncExecutor = new LoadAsyncExecutor();
+        this.loadExecutorService =
+                new ThreadPoolExecutor(
+                        1,
+                        1,
+                        0L,
+                        TimeUnit.MILLISECONDS,
+                        new LinkedBlockingQueue<>(1),
+                        new DefaultThreadFactory("streamload-executor"),
+                        new ThreadPoolExecutor.AbortPolicy());
+
+        start();
+        this.started = new AtomicBoolean(true);
+    }
+
+    public void start() {
+        if (!loadThreadAlive) {
+            this.loadExecutorService.execute(loadAsyncExecutor);
+            this.exception.set(null);
+        }
+    }
+
+    public void flush(String label, RecordBuffer buffer) {
+        checkFlushException();
+        buffer.setLabel(label);
+        putRecordToFlushQueue(buffer);
+    }
+
+    public void forceFlush() {
+        checkFlushException();
+        waitAsyncLoadFinish();
+    }
+
+    private void waitAsyncLoadFinish() {
+        // Make sure the data in the queue has been flushed
+        for (int i = 0; i < 2; i++) {
+            RecordBuffer empty = new RecordBuffer();
+            putRecordToFlushQueue(empty);
+        }
+    }
+
+    private void putRecordToFlushQueue(RecordBuffer buffer) {
+        checkFlushException();
+        if (!loadThreadAlive) {
+            throw new RuntimeException("load thread already exit, write was 
interrupted");
+        }
+        try {
+            flushQueue.put(buffer);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Failed to put record buffer to flush 
queue");
+        }
+        checkFlushException();
+    }
+
+    private void checkFlushException() {
+        if (exception.get() != null) {
+            throw new DorisException(exception.get());
+        }
+    }
+
+    public void close() {
+        if (started.compareAndSet(true, false)) {
+            LOG.info("close executorService");
+            loadExecutorService.shutdown();
+        }
+    }
+
+    public boolean hasCapacity() {
+        return flushQueue.remainingCapacity() > 0;
+    }
+
+    public void checkException() {
+        checkFlushException();
+    }
+
+    class LoadAsyncExecutor implements Runnable {
+
+        @Override
+        public void run() {
+            LOG.info("LoadAsyncExecutor start");
+            loadThreadAlive = true;
+            while (started.get()) {
+                try {
+                    RecordBuffer buffer = flushQueue.poll(2000L, 
TimeUnit.MILLISECONDS);
+                    if (buffer == null) {
+                        continue;
+                    }
+                    if (buffer.getLabel() == null) {
+                        // When the label is empty, it is the eof buffer for 
checkpoint flush.
+                        continue;
+                    }
+                    load(buffer.getLabel(), buffer);
+
+                } catch (Exception e) {
+                    LOG.error("worker running error", e);
+                    exception.set(e);
+                    // clear queue to avoid writer thread blocking
+                    flushQueue.clear();
+                    break;
+                }
+            }
+            LOG.info("LoadAsyncExecutor stop");
+            loadThreadAlive = false;
+        }
+
+        /** execute stream load. */
+        public void load(String label, RecordBuffer buffer) throws IOException 
{
+            if (enableGroupCommit) {
+                label = null;
+            }
+
+            refreshLoadUrl(database, table);
+            String data = buffer.getData();
+            ByteArrayEntity entity = new 
ByteArrayEntity(data.getBytes(StandardCharsets.UTF_8));
+            HttpPutBuilder putBuilder = new HttpPutBuilder();
+            putBuilder
+                    .setUrl(loadUrl)
+                    .baseAuth(user, password)
+                    .setLabel(label)
+                    .addCommonHeader()
+                    .setEntity(entity)
+                    .addHiddenColumns(dorisOptions.isEnableDelete())
+                    .enable2PC(dorisOptions.enable2PC())
+                    .addProperties(dorisOptions.getStreamLoadProp());
+
+            if (enableGroupCommit) {
+                LOG.info("stream load started with group commit on host {}", 
hostPort);
+            } else {
+                LOG.info("stream load started for {} on host {}", label, 
hostPort);
+            }
+
+            try (CloseableHttpResponse response = 
httpClient.execute(putBuilder.build())) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    String loadResult = 
EntityUtils.toString(response.getEntity());
+                    LOG.info("load Result {}", loadResult);
+                    KafkaRespContent respContent =
+                            OBJECT_MAPPER.readValue(loadResult, 
KafkaRespContent.class);
+                    if (respContent == null || respContent.getMessage() == 
null) {
+                        throw new StreamLoadException("response error : " + 
loadResult);
+                    }
+                    if 
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+                        String errMsg =
+                                String.format(
+                                        "stream load error: %s, see more in 
%s",
+                                        respContent.getMessage(), 
respContent.getErrorURL());
+                        throw new StreamLoadException(errMsg);
+                    }
+                    respContent.setDatabase(database);
+                    respContent.setTable(table);
+                    respContent.setLastOffset(buffer.getLastOffset());
+                    respContent.setTopic(topic);
+                    respContents.add(respContent);
+                }
+            } catch (Exception ex) {
+                String err;
+                if (enableGroupCommit) {
+                    err = "failed to stream load data with group commit";
+                } else {
+                    err = "failed to stream load data with label: " + label;
+                }
+
+                LOG.warn(err, ex);
+                throw new StreamLoadException(err, ex);
+            }
+        }
+
+        private void refreshLoadUrl(String database, String table) {
+            hostPort = backendUtils.getAvailableBackend();
+            loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, 
table);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DefaultThreadFactory.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DefaultThreadFactory.java
new file mode 100644
index 0000000..82ff62a
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DefaultThreadFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.load;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DefaultThreadFactory implements ThreadFactory {
+    private static final AtomicInteger poolNumber = new AtomicInteger(1);
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+    private final String namePrefix;
+
+    public DefaultThreadFactory(String name) {
+        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-";
+    }
+
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
+        t.setDaemon(false);
+        return t;
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
index ad0b84a..8e79f9b 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -115,11 +115,12 @@ public abstract class AbstractKafka2DorisSink {
         }
     }
 
-    protected static void executeSql(Connection connection, String... sql) {
+    protected static void executeSql(String... sql) {
         if (sql == null || sql.length == 0) {
             return;
         }
-        try (Statement statement = connection.createStatement()) {
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement()) {
             for (String s : sql) {
                 if (StringUtils.isNotEmpty(s)) {
                     statement.execute(s);
@@ -133,8 +134,8 @@ public abstract class AbstractKafka2DorisSink {
 
     protected static void createDatabase(String databaseName) {
         LOG.info("Will to be create database, sql={}", databaseName);
-        try {
-            Statement statement = getJdbcConnection().createStatement();
+        try (Connection conn = getJdbcConnection();
+                Statement statement = conn.createStatement()) {
             statement.execute("create database if not exists " + databaseName);
         } catch (SQLException e) {
             throw new DorisException("Failed to create doris table.", e);
@@ -144,8 +145,8 @@ public abstract class AbstractKafka2DorisSink {
 
     protected void createTable(String sql) {
         LOG.info("Will to be create doris table, sql={}", sql);
-        try {
-            Statement statement = getJdbcConnection().createStatement();
+        try (Connection conn = getJdbcConnection();
+                Statement statement = conn.createStatement()) {
             statement.execute(sql);
         } catch (SQLException e) {
             throw new DorisException("Failed to create doris table.", e);
@@ -155,8 +156,8 @@ public abstract class AbstractKafka2DorisSink {
 
     protected void insertTable(String sql) {
         LOG.info("Will insert data to Doris table. SQL: {}", sql);
-        try {
-            Statement statement = getJdbcConnection().createStatement();
+        try (Connection conn = getJdbcConnection();
+                Statement statement = conn.createStatement()) {
             int rowCount = statement.executeUpdate(sql);
             LOG.info("Inserted {} item data into the Doris table.", rowCount);
         } catch (SQLException e) {
@@ -184,7 +185,8 @@ public abstract class AbstractKafka2DorisSink {
     public void checkResult(List<String> expected, String query, int 
columnSize) throws Exception {
         List<String> actual = new ArrayList<>();
 
-        try (Statement statement = getJdbcConnection().createStatement()) {
+        try (Connection conn = getJdbcConnection();
+                Statement statement = conn.createStatement()) {
             ResultSet sinkResultSet = statement.executeQuery(query);
             while (sinkResultSet.next()) {
                 List<String> row = new ArrayList<>();
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
index d920903..8a4d650 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
@@ -80,7 +80,7 @@ public class AvroMsgE2ETest extends AbstractAvroE2ESinkTest {
     }
 
     private static void setTimeZone() {
-        executeSql(getJdbcConnection(), "set global time_zone = 
'Asia/Shanghai'");
+        executeSql("set global time_zone = 'Asia/Shanghai'");
     }
 
     @Test
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
index c81055f..3dbd19e 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
@@ -66,6 +66,7 @@ public class DorisSinkFailoverSinkTest extends 
AbstractStringE2ESinkTest {
         JsonNode configNode = rootNode.get(CONFIG);
         Map<String, String> configMap = objectMapper.convertValue(configNode, 
Map.class);
         configMap.put(ConfigCheckUtils.TASK_ID, "1");
+
         Map<String, String> lowerCaseConfigMap =
                 DorisSinkConnectorConfig.convertToLowercase(configMap);
         DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap);
@@ -76,7 +77,7 @@ public class DorisSinkFailoverSinkTest extends 
AbstractStringE2ESinkTest {
     }
 
     private static void setTimeZone() {
-        executeSql(getJdbcConnection(), "set global time_zone = 
'Asia/Shanghai'");
+        executeSql("set global time_zone = 'Asia/Shanghai'");
     }
 
     /** mock streamload failure */
@@ -84,23 +85,41 @@ public class DorisSinkFailoverSinkTest extends 
AbstractStringE2ESinkTest {
     public void testStreamLoadFailoverSink() throws Exception {
         LOG.info("start to test testStreamLoadFailoverSink.");
         
initialize("src/test/resources/e2e/string_converter/string_msg_failover_connector.json");
-        Thread.sleep(5000);
         String topic = "string_test_failover";
         String msg1 = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
         produceMsg2Kafka(topic, msg1);
-
         String tableSql =
                 
loadContent("src/test/resources/e2e/string_converter/string_msg_tab_failover.sql");
         createTable(tableSql);
+        startCheck(topic);
+    }
+
+    /** mock streamload failure */
+    @Test
+    public void testStreamLoadFailoverSinkCombineFlush() throws Exception {
+        LOG.info("start to test testStreamLoadFailoverSinkCombineFlush.");
+        initialize(
+                
"src/test/resources/e2e/string_converter/string_msg_failover_connector_uniq.json");
+        String topic = "string_test_failover_uniq";
+        String msg1 = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
+        produceMsg2Kafka(topic, msg1);
+        String tableSql =
+                loadContent(
+                        
"src/test/resources/e2e/string_converter/string_msg_tab_failover_uniq.sql");
+        createTable(tableSql);
+        startCheck(topic);
+    }
 
+    public void startCheck(String topic) throws Exception {
         kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
 
         String table = dorisOptions.getTopicMapTable(topic);
         String querySql =
                 String.format("select id,name,age from %s.%s order by id", 
database, table);
         LOG.info("start to query result from doris. sql={}", querySql);
+        Connection jdbcConnection = getJdbcConnection();
         while (true) {
-            List<String> result = executeSQLStatement(getJdbcConnection(), 
LOG, querySql, 3);
+            List<String> result = executeSQLStatement(jdbcConnection, LOG, 
querySql, 3);
             // until load success one time
             if (result.size() >= 1) {
                 faultInjectionOpen();
@@ -111,7 +130,7 @@ public class DorisSinkFailoverSinkTest extends 
AbstractStringE2ESinkTest {
                 faultInjectionClear();
                 break;
             } else {
-                Thread.sleep(100);
+                Thread.sleep(1000);
             }
         }
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 056f513..280ddf3 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -21,6 +21,7 @@ package 
org.apache.doris.kafka.connector.e2e.sink.stringconverter;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -74,7 +75,7 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
     }
 
     private static void setTimeZone() {
-        executeSql(getJdbcConnection(), "set global time_zone = 
'Asia/Shanghai'");
+        executeSql("set global time_zone = 'Asia/Shanghai'");
     }
 
     @Test
@@ -88,23 +89,26 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         String tableSql = 
loadContent("src/test/resources/e2e/string_converter/string_msg_tab.sql");
         createTable(tableSql);
         kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+        Thread.sleep(20000);
 
         String table = dorisOptions.getTopicMapTable(topic);
-        Statement statement = getJdbcConnection().createStatement();
-        String querySql = "select * from " + database + "." + table;
-        LOG.info("start to query result from doris. sql={}", querySql);
-        ResultSet resultSet = statement.executeQuery(querySql);
-
-        Assert.assertTrue(resultSet.next());
-
-        int id = resultSet.getInt("id");
-        String name = resultSet.getString("name");
-        int age = resultSet.getInt("age");
-        LOG.info("Query result is id={}, name={}, age={}", id, name, age);
-
-        Assert.assertEquals(1, id);
-        Assert.assertEquals("zhangsan", name);
-        Assert.assertEquals(12, age);
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement(); ) {
+            String querySql = "select * from " + database + "." + table;
+            LOG.info("start to query result from doris. sql={}", querySql);
+            ResultSet resultSet = statement.executeQuery(querySql);
+
+            Assert.assertTrue(resultSet.next());
+
+            int id = resultSet.getInt("id");
+            String name = resultSet.getString("name");
+            int age = resultSet.getInt("age");
+            LOG.info("Query result is id={}, name={}, age={}", id, name, age);
+
+            Assert.assertEquals(1, id);
+            Assert.assertEquals("zhangsan", name);
+            Assert.assertEquals(12, age);
+        }
     }
 
     @Test
@@ -425,21 +429,23 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
 
         String table = dorisOptions.getTopicMapTable(topic);
-        Statement statement = getJdbcConnection().createStatement();
-        String querySql = "select * from " + database + "." + table;
-        LOG.info("start to query result from doris. sql={}", querySql);
-        ResultSet resultSet = statement.executeQuery(querySql);
-
-        Assert.assertTrue(resultSet.next());
-
-        int id = resultSet.getInt("id");
-        String name = resultSet.getString("name");
-        int age = resultSet.getInt("age");
-        LOG.info("Query result is id={}, name={}, age={}", id, name, age);
-
-        Assert.assertEquals(1, id);
-        Assert.assertEquals("zhangsan", name);
-        Assert.assertEquals(12, age);
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            String querySql = "select * from " + database + "." + table;
+            LOG.info("start to query result from doris. sql={}", querySql);
+            ResultSet resultSet = statement.executeQuery(querySql);
+
+            Assert.assertTrue(resultSet.next());
+
+            int id = resultSet.getInt("id");
+            String name = resultSet.getString("name");
+            int age = resultSet.getInt("age");
+            LOG.info("Query result is id={}, name={}, age={}", id, name, age);
+
+            Assert.assertEquals(1, id);
+            Assert.assertEquals("zhangsan", name);
+            Assert.assertEquals(12, age);
+        }
     }
 
     @Test
@@ -456,21 +462,23 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
 
         String table = dorisOptions.getTopicMapTable(topic);
-        Statement statement = getJdbcConnection().createStatement();
-        String querySql = "select * from " + database + "." + table;
-        LOG.info("start to query result from doris. sql={}", querySql);
-        ResultSet resultSet = statement.executeQuery(querySql);
-
-        Assert.assertTrue(resultSet.next());
-
-        int id = resultSet.getInt("id");
-        String name = resultSet.getString("name");
-        int age = resultSet.getInt("age");
-        LOG.info("Query result is id={}, name={}, age={}", id, name, age);
-
-        Assert.assertEquals(1, id);
-        Assert.assertEquals("zhangsan", name);
-        Assert.assertEquals(12, age);
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement(); ) {
+            String querySql = "select * from " + database + "." + table;
+            LOG.info("start to query result from doris. sql={}", querySql);
+            ResultSet resultSet = statement.executeQuery(querySql);
+
+            Assert.assertTrue(resultSet.next());
+
+            int id = resultSet.getInt("id");
+            String name = resultSet.getString("name");
+            int age = resultSet.getInt("age");
+            LOG.info("Query result is id={}, name={}, age={}", id, name, age);
+
+            Assert.assertEquals(1, id);
+            Assert.assertEquals("zhangsan", name);
+            Assert.assertEquals(12, age);
+        }
     }
 
     @AfterClass
diff --git 
a/src/test/resources/e2e/string_converter/string_msg_failover_connector_uniq.json
 
b/src/test/resources/e2e/string_converter/string_msg_failover_connector_uniq.json
new file mode 100644
index 0000000..83cdc1c
--- /dev/null
+++ 
b/src/test/resources/e2e/string_converter/string_msg_failover_connector_uniq.json
@@ -0,0 +1,25 @@
+{
+  "name":"string_msg_failover_connector_uniq",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"string_test_failover_uniq",
+    "tasks.max":"1",
+    "doris.topic2table.map": 
"string_test_failover_uniq:string_msg_tab_failover_uniq",
+    "buffer.count.records":"1",
+    "buffer.flush.time":"1200",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"string_msg_failover",
+    "load.model":"stream_load",
+    "enable.2pc": "true",
+    "enable.combine.flush":"true",
+    "max.retries": "10",
+    "retry.interval.ms": "5000",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+  }
+}
\ No newline at end of file
diff --git 
a/src/test/resources/e2e/string_converter/string_msg_tab_failover_uniq.sql 
b/src/test/resources/e2e/string_converter/string_msg_tab_failover_uniq.sql
new file mode 100644
index 0000000..d1bad30
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/string_msg_tab_failover_uniq.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE string_msg_failover.string_msg_tab_failover_uniq (
+  id INT NULL,
+  name VARCHAR(100) NULL,
+  age INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to