This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0813cb5 [improve] support combine flush async (#74)
0813cb5 is described below
commit 0813cb51f3c9b0184d3fc1ca80de8472c1931fc7
Author: wudi <[email protected]>
AuthorDate: Mon May 26 10:48:49 2025 +0800
[improve] support combine flush async (#74)
When this is enabled, enable.combine.flush=true . Currently, streamload is
still synchronous, which will block the consumption of kafka data while
writing. Change it to asynchronous to improve throughput.
---
.../doris/kafka/connector/DorisSinkTask.java | 2 +
.../service/DorisCombinedSinkService.java | 28 ++-
.../connector/service/DorisDefaultSinkService.java | 3 +
.../kafka/connector/service/DorisSinkService.java | 3 +
.../connector/writer/AsyncStreamLoadWriter.java | 139 +++++++++++
.../doris/kafka/connector/writer/DorisWriter.java | 14 +-
.../doris/kafka/connector/writer/RecordBuffer.java | 9 +
.../writer/load/AsyncDorisStreamLoad.java | 256 +++++++++++++++++++++
.../writer/load/DefaultThreadFactory.java | 39 ++++
.../e2e/sink/AbstractKafka2DorisSink.java | 20 +-
.../connector/e2e/sink/avro/AvroMsgE2ETest.java | 2 +-
.../stringconverter/DorisSinkFailoverSinkTest.java | 29 ++-
.../e2e/sink/stringconverter/StringMsgE2ETest.java | 100 ++++----
.../string_msg_failover_connector_uniq.json | 25 ++
.../string_msg_tab_failover_uniq.sql | 12 +
15 files changed, 602 insertions(+), 79 deletions(-)
diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index 5f97b29..9e553c5 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -109,6 +109,8 @@ public class DorisSinkTask extends SinkTask {
remainingRetries);
remainingRetries--;
context.timeout(options.getRetryIntervalMs());
+ // When writing asynchronously, need to restart the
asynchronous thread
+ sink.init();
throw new RetriableException(ex);
}
throw ex;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
index 711b405..f0d48f4 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
@@ -22,8 +22,8 @@ package org.apache.doris.kafka.connector.service;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import org.apache.doris.kafka.connector.writer.AsyncStreamLoadWriter;
import org.apache.doris.kafka.connector.writer.DorisWriter;
-import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
/** Combined all partitions and write once. */
public class DorisCombinedSinkService extends DorisDefaultSinkService {
private static final Logger LOG =
LoggerFactory.getLogger(DorisCombinedSinkService.class);
-
private final Map<String, HashMap<Integer, Long>> topicPartitionOffset;
DorisCombinedSinkService(Map<String, String> config, SinkTaskContext
context) {
@@ -42,6 +41,17 @@ public class DorisCombinedSinkService extends
DorisDefaultSinkService {
this.topicPartitionOffset = new HashMap<>();
}
+ @Override
+ public void init() {
+ for (DorisWriter wr : writer.values()) {
+ if (wr instanceof AsyncStreamLoadWriter) {
+ // When the stream load asynchronous thread down,
+ // it needs to be restarted when retrying
+ ((AsyncStreamLoadWriter) wr).start();
+ }
+ }
+ }
+
/**
* Create new task
*
@@ -60,8 +70,9 @@ public class DorisCombinedSinkService extends
DorisDefaultSinkService {
// Only by topic
int partition = -1;
DorisWriter dorisWriter =
- new StreamLoadWriter(
+ new AsyncStreamLoadWriter(
tableName, topic, partition, dorisOptions, conn,
connectMonitor);
+
writer.put(writerKey, dorisWriter);
metricsJmxReporter.start();
}
@@ -86,14 +97,6 @@ public class DorisCombinedSinkService extends
DorisDefaultSinkService {
// Might happen a count of record based flushing,buffer
insert(record);
}
-
- // check all sink writer to see if they need to be flushed
- for (DorisWriter writer : writer.values()) {
- // Time based flushing
- if (writer.shouldFlush()) {
- writer.flushBuffer();
- }
- }
}
@Override
@@ -122,8 +125,9 @@ public class DorisCombinedSinkService extends
DorisDefaultSinkService {
public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// Here we force flushing the data in memory once to
// ensure that the offsets recorded in topicPartitionOffset have been
flushed to doris
+ LOG.info("trigger flush by commit, topic {}",
topicPartitionOffset.keySet());
for (DorisWriter writer : writer.values()) {
- writer.flushBuffer();
+ writer.commitFlush();
}
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 68f0d34..6871cc1 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -87,6 +87,9 @@ public class DorisDefaultSinkService implements
DorisSinkService {
this.metricsJmxReporter);
}
+ @Override
+ public void init() {}
+
@Override
public void startTask(TopicPartition topicPartition) {
startTask(dorisOptions.getTopicMapTable(topicPartition.topic()),
topicPartition);
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
index dacee81..a3d72e9 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
@@ -28,6 +28,9 @@ import org.apache.kafka.connect.sink.SinkRecord;
/** Background service of data sink, responsible to create/drop table and
insert/delete files */
public interface DorisSinkService {
+ /** init task for writer */
+ void init();
+
/**
* Start the Task.
*
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/AsyncStreamLoadWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/AsyncStreamLoadWriter.java
new file mode 100644
index 0000000..7003629
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/AsyncStreamLoadWriter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.connection.ConnectionProvider;
+import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
+import org.apache.doris.kafka.connector.utils.BackendUtils;
+import org.apache.doris.kafka.connector.writer.load.AsyncDorisStreamLoad;
+import org.apache.doris.kafka.connector.writer.load.DefaultThreadFactory;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncStreamLoadWriter extends DorisWriter {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncStreamLoadWriter.class);
+ private final LabelGenerator labelGenerator;
+ private AsyncDorisStreamLoad dorisStreamLoad;
+ private final transient ScheduledExecutorService scheduledExecutorService;
+
+ public AsyncStreamLoadWriter(
+ String tableName,
+ String topic,
+ int partition,
+ DorisOptions dorisOptions,
+ ConnectionProvider connectionProvider,
+ DorisConnectMonitor connectMonitor) {
+ super(tableName, topic, partition, dorisOptions, connectionProvider,
connectMonitor);
+ this.labelGenerator = new LabelGenerator(topic, partition,
tableIdentifier);
+ BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions,
LOG);
+ this.dorisStreamLoad =
+ new AsyncDorisStreamLoad(backendUtils, dorisOptions, topic,
this.tableName);
+ this.scheduledExecutorService =
+ new ScheduledThreadPoolExecutor(
+ 1, new
DefaultThreadFactory("stream-load-flush-interval"));
+ // when uploading data in streaming mode, we need to regularly detect
whether there are
+ // exceptions.
+ scheduledExecutorService.scheduleWithFixedDelay(
+ this::intervalFlush,
+ dorisOptions.getFlushTime(),
+ dorisOptions.getFlushTime(),
+ TimeUnit.SECONDS);
+ }
+
+ /** start async thread stream load */
+ public void start() {
+ this.dorisStreamLoad.start();
+ }
+
+ public void insert(final SinkRecord dorisRecord) {
+ checkFlushException();
+ putBuffer(dorisRecord);
+ if (buffer.getBufferSizeBytes() >= dorisOptions.getFileSize()
+ || (dorisOptions.getRecordNum() != 0
+ && buffer.getNumOfRecords() >=
dorisOptions.getRecordNum())) {
+ LOG.info(
+ "trigger flush by buffer size or count, buffer size: {},
num of records: {}, lastoffset : {}",
+ buffer.getBufferSizeBytes(),
+ buffer.getNumOfRecords(),
+ buffer.getLastOffset());
+ bufferFullFlush();
+ }
+ }
+
+ private void bufferFullFlush() {
+ doFlush(false, true);
+ }
+
+ private void intervalFlush() {
+ LOG.debug("interval flush trigger");
+ doFlush(false, false);
+ }
+
+ public void commitFlush() {
+ doFlush(true, false);
+ }
+
+ private synchronized void doFlush(boolean waitUtilDone, boolean
bufferFull) {
+ if (waitUtilDone || bufferFull) {
+ flushBuffer(waitUtilDone);
+ } else if (dorisStreamLoad.hasCapacity()) {
+ flushBuffer(false);
+ }
+ }
+
+ public synchronized void flushBuffer(boolean waitUtilDone) {
+ if (!buffer.isEmpty()) {
+ RecordBuffer tmpBuff = buffer;
+
+ String label =
labelGenerator.generateLabel(tmpBuff.getLastOffset());
+ dorisStreamLoad.flush(label, tmpBuff);
+ this.buffer = new RecordBuffer();
+ }
+
+ if (waitUtilDone) {
+ dorisStreamLoad.forceFlush();
+ }
+ }
+
+ private void checkFlushException() {
+ dorisStreamLoad.checkException();
+ }
+
+ @Override
+ public void commit(int partition) {
+ // Won't go here
+ }
+
+ @Override
+ public long getOffset() {
+ // Won't go here
+ return 0;
+ }
+
+ @Override
+ public void fetchOffset() {
+ // Won't go here
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 9e5cfb9..c48fe40 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -42,7 +42,7 @@ public abstract class DorisWriter {
protected String dbName;
protected final String tableIdentifier;
protected List<String> fileNames;
- private RecordBuffer buffer;
+ protected RecordBuffer buffer;
protected final AtomicLong committedOffset; // loaded offset + 1
protected final AtomicLong flushedOffset; // flushed offset
protected final AtomicLong processedOffset; // processed offset
@@ -116,11 +116,8 @@ public abstract class DorisWriter {
protected void insertRecord(final SinkRecord record) {
// discard the record if the record offset is smaller or equal to
server side offset
- // when enable.combine.flush=true, No verification is required because
the offsets of
- // multiple partitions cannot be compared.
- if (dorisOptions.isEnableCombineFlush()
- || (record.kafkaOffset() > this.offsetPersistedInDoris.get()
- && record.kafkaOffset() > processedOffset.get())) {
+ if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
+ && record.kafkaOffset() > processedOffset.get()) {
SinkRecord dorisRecord = record;
RecordBuffer tmpBuff = null;
@@ -175,6 +172,11 @@ public abstract class DorisWriter {
>= (dorisOptions.getFlushTime() * 1000);
}
+ // for combine flush
+ public void commitFlush() {
+ flushBuffer();
+ }
+
public void flushBuffer() {
if (buffer.isEmpty()) {
return;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java
b/src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java
index 9fcbc23..990043f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java
@@ -29,6 +29,7 @@ public class RecordBuffer extends PartitionBuffer<String> {
private static final Logger LOG =
LoggerFactory.getLogger(RecordBuffer.class);
public static final String LINE_SEPARATOR = "\n";
private final StringJoiner buffer;
+ private String label;
public RecordBuffer() {
super();
@@ -52,4 +53,12 @@ public class RecordBuffer extends PartitionBuffer<String> {
getLastOffset());
return result;
}
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
b/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
new file mode 100644
index 0000000..49d7ba1
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.load;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.exception.StreamLoadException;
+import org.apache.doris.kafka.connector.model.KafkaRespContent;
+import org.apache.doris.kafka.connector.utils.BackendUtils;
+import org.apache.doris.kafka.connector.utils.HttpPutBuilder;
+import org.apache.doris.kafka.connector.utils.HttpUtils;
+import org.apache.doris.kafka.connector.writer.LoadStatus;
+import org.apache.doris.kafka.connector.writer.RecordBuffer;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncDorisStreamLoad extends DataLoad {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncDorisStreamLoad.class);
+ private static final String LOAD_URL_PATTERN =
"http://%s/api/%s/%s/_stream_load";
+ private static final List<String> DORIS_SUCCESS_STATUS =
+ new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS,
LoadStatus.PUBLISH_TIMEOUT));
+ private String loadUrl;
+ private final DorisOptions dorisOptions;
+ private final String topic;
+ private String hostPort;
+ private final CloseableHttpClient httpClient = new
HttpUtils().getHttpClient();
+ private final BackendUtils backendUtils;
+ private Queue<KafkaRespContent> respContents = new LinkedList<>();
+ private final boolean enableGroupCommit;
+ private ExecutorService loadExecutorService;
+ private LoadAsyncExecutor loadAsyncExecutor;
+ private BlockingQueue<RecordBuffer> flushQueue = new
LinkedBlockingDeque<>(1);
+ private final AtomicBoolean started;
+ private volatile boolean loadThreadAlive = false;
+ private AtomicReference<Throwable> exception = new AtomicReference<>(null);
+
+ public AsyncDorisStreamLoad(
+ BackendUtils backendUtils, DorisOptions dorisOptions, String
topic, String table) {
+ this.database = dorisOptions.getDatabase();
+ this.table = table;
+ this.user = dorisOptions.getUser();
+ this.password = dorisOptions.getPassword();
+ this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database,
table);
+ this.dorisOptions = dorisOptions;
+ this.backendUtils = backendUtils;
+ this.topic = topic;
+ this.enableGroupCommit = dorisOptions.enableGroupCommit();
+ this.loadAsyncExecutor = new LoadAsyncExecutor();
+ this.loadExecutorService =
+ new ThreadPoolExecutor(
+ 1,
+ 1,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(1),
+ new DefaultThreadFactory("streamload-executor"),
+ new ThreadPoolExecutor.AbortPolicy());
+
+ start();
+ this.started = new AtomicBoolean(true);
+ }
+
+ public void start() {
+ if (!loadThreadAlive) {
+ this.loadExecutorService.execute(loadAsyncExecutor);
+ this.exception.set(null);
+ }
+ }
+
+ public void flush(String label, RecordBuffer buffer) {
+ checkFlushException();
+ buffer.setLabel(label);
+ putRecordToFlushQueue(buffer);
+ }
+
+ public void forceFlush() {
+ checkFlushException();
+ waitAsyncLoadFinish();
+ }
+
+ private void waitAsyncLoadFinish() {
+ // Make sure the data in the queue has been flushed
+ for (int i = 0; i < 2; i++) {
+ RecordBuffer empty = new RecordBuffer();
+ putRecordToFlushQueue(empty);
+ }
+ }
+
+ private void putRecordToFlushQueue(RecordBuffer buffer) {
+ checkFlushException();
+ if (!loadThreadAlive) {
+ throw new RuntimeException("load thread already exit, write was
interrupted");
+ }
+ try {
+ flushQueue.put(buffer);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Failed to put record buffer to flush
queue");
+ }
+ checkFlushException();
+ }
+
+ private void checkFlushException() {
+ if (exception.get() != null) {
+ throw new DorisException(exception.get());
+ }
+ }
+
+ public void close() {
+ if (started.compareAndSet(true, false)) {
+ LOG.info("close executorService");
+ loadExecutorService.shutdown();
+ }
+ }
+
+ public boolean hasCapacity() {
+ return flushQueue.remainingCapacity() > 0;
+ }
+
+ public void checkException() {
+ checkFlushException();
+ }
+
+ class LoadAsyncExecutor implements Runnable {
+
+ @Override
+ public void run() {
+ LOG.info("LoadAsyncExecutor start");
+ loadThreadAlive = true;
+ while (started.get()) {
+ try {
+ RecordBuffer buffer = flushQueue.poll(2000L,
TimeUnit.MILLISECONDS);
+ if (buffer == null) {
+ continue;
+ }
+ if (buffer.getLabel() == null) {
+ // When the label is empty, it is the eof buffer for
checkpoint flush.
+ continue;
+ }
+ load(buffer.getLabel(), buffer);
+
+ } catch (Exception e) {
+ LOG.error("worker running error", e);
+ exception.set(e);
+ // clear queue to avoid writer thread blocking
+ flushQueue.clear();
+ break;
+ }
+ }
+ LOG.info("LoadAsyncExecutor stop");
+ loadThreadAlive = false;
+ }
+
+ /** execute stream load. */
+ public void load(String label, RecordBuffer buffer) throws IOException
{
+ if (enableGroupCommit) {
+ label = null;
+ }
+
+ refreshLoadUrl(database, table);
+ String data = buffer.getData();
+ ByteArrayEntity entity = new
ByteArrayEntity(data.getBytes(StandardCharsets.UTF_8));
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder
+ .setUrl(loadUrl)
+ .baseAuth(user, password)
+ .setLabel(label)
+ .addCommonHeader()
+ .setEntity(entity)
+ .addHiddenColumns(dorisOptions.isEnableDelete())
+ .enable2PC(dorisOptions.enable2PC())
+ .addProperties(dorisOptions.getStreamLoadProp());
+
+ if (enableGroupCommit) {
+ LOG.info("stream load started with group commit on host {}",
hostPort);
+ } else {
+ LOG.info("stream load started for {} on host {}", label,
hostPort);
+ }
+
+ try (CloseableHttpResponse response =
httpClient.execute(putBuilder.build())) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == 200 && response.getEntity() != null) {
+ String loadResult =
EntityUtils.toString(response.getEntity());
+ LOG.info("load Result {}", loadResult);
+ KafkaRespContent respContent =
+ OBJECT_MAPPER.readValue(loadResult,
KafkaRespContent.class);
+ if (respContent == null || respContent.getMessage() ==
null) {
+ throw new StreamLoadException("response error : " +
loadResult);
+ }
+ if
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+ String errMsg =
+ String.format(
+ "stream load error: %s, see more in
%s",
+ respContent.getMessage(),
respContent.getErrorURL());
+ throw new StreamLoadException(errMsg);
+ }
+ respContent.setDatabase(database);
+ respContent.setTable(table);
+ respContent.setLastOffset(buffer.getLastOffset());
+ respContent.setTopic(topic);
+ respContents.add(respContent);
+ }
+ } catch (Exception ex) {
+ String err;
+ if (enableGroupCommit) {
+ err = "failed to stream load data with group commit";
+ } else {
+ err = "failed to stream load data with label: " + label;
+ }
+
+ LOG.warn(err, ex);
+ throw new StreamLoadException(err, ex);
+ }
+ }
+
+ private void refreshLoadUrl(String database, String table) {
+ hostPort = backendUtils.getAvailableBackend();
+ loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database,
table);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DefaultThreadFactory.java
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DefaultThreadFactory.java
new file mode 100644
index 0000000..82ff62a
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DefaultThreadFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.writer.load;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DefaultThreadFactory implements ThreadFactory {
+ private static final AtomicInteger poolNumber = new AtomicInteger(1);
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ public DefaultThreadFactory(String name) {
+ namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-";
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
+ t.setDaemon(false);
+ return t;
+ }
+}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
index ad0b84a..8e79f9b 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -115,11 +115,12 @@ public abstract class AbstractKafka2DorisSink {
}
}
- protected static void executeSql(Connection connection, String... sql) {
+ protected static void executeSql(String... sql) {
if (sql == null || sql.length == 0) {
return;
}
- try (Statement statement = connection.createStatement()) {
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
for (String s : sql) {
if (StringUtils.isNotEmpty(s)) {
statement.execute(s);
@@ -133,8 +134,8 @@ public abstract class AbstractKafka2DorisSink {
protected static void createDatabase(String databaseName) {
LOG.info("Will to be create database, sql={}", databaseName);
- try {
- Statement statement = getJdbcConnection().createStatement();
+ try (Connection conn = getJdbcConnection();
+ Statement statement = conn.createStatement()) {
statement.execute("create database if not exists " + databaseName);
} catch (SQLException e) {
throw new DorisException("Failed to create doris table.", e);
@@ -144,8 +145,8 @@ public abstract class AbstractKafka2DorisSink {
protected void createTable(String sql) {
LOG.info("Will to be create doris table, sql={}", sql);
- try {
- Statement statement = getJdbcConnection().createStatement();
+ try (Connection conn = getJdbcConnection();
+ Statement statement = conn.createStatement()) {
statement.execute(sql);
} catch (SQLException e) {
throw new DorisException("Failed to create doris table.", e);
@@ -155,8 +156,8 @@ public abstract class AbstractKafka2DorisSink {
protected void insertTable(String sql) {
LOG.info("Will insert data to Doris table. SQL: {}", sql);
- try {
- Statement statement = getJdbcConnection().createStatement();
+ try (Connection conn = getJdbcConnection();
+ Statement statement = conn.createStatement()) {
int rowCount = statement.executeUpdate(sql);
LOG.info("Inserted {} item data into the Doris table.", rowCount);
} catch (SQLException e) {
@@ -184,7 +185,8 @@ public abstract class AbstractKafka2DorisSink {
public void checkResult(List<String> expected, String query, int
columnSize) throws Exception {
List<String> actual = new ArrayList<>();
- try (Statement statement = getJdbcConnection().createStatement()) {
+ try (Connection conn = getJdbcConnection();
+ Statement statement = conn.createStatement()) {
ResultSet sinkResultSet = statement.executeQuery(query);
while (sinkResultSet.next()) {
List<String> row = new ArrayList<>();
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
index d920903..8a4d650 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
@@ -80,7 +80,7 @@ public class AvroMsgE2ETest extends AbstractAvroE2ESinkTest {
}
private static void setTimeZone() {
- executeSql(getJdbcConnection(), "set global time_zone =
'Asia/Shanghai'");
+ executeSql("set global time_zone = 'Asia/Shanghai'");
}
@Test
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
index c81055f..3dbd19e 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
@@ -66,6 +66,7 @@ public class DorisSinkFailoverSinkTest extends
AbstractStringE2ESinkTest {
JsonNode configNode = rootNode.get(CONFIG);
Map<String, String> configMap = objectMapper.convertValue(configNode,
Map.class);
configMap.put(ConfigCheckUtils.TASK_ID, "1");
+
Map<String, String> lowerCaseConfigMap =
DorisSinkConnectorConfig.convertToLowercase(configMap);
DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap);
@@ -76,7 +77,7 @@ public class DorisSinkFailoverSinkTest extends
AbstractStringE2ESinkTest {
}
private static void setTimeZone() {
- executeSql(getJdbcConnection(), "set global time_zone =
'Asia/Shanghai'");
+ executeSql("set global time_zone = 'Asia/Shanghai'");
}
/** mock streamload failure */
@@ -84,23 +85,41 @@ public class DorisSinkFailoverSinkTest extends
AbstractStringE2ESinkTest {
public void testStreamLoadFailoverSink() throws Exception {
LOG.info("start to test testStreamLoadFailoverSink.");
initialize("src/test/resources/e2e/string_converter/string_msg_failover_connector.json");
- Thread.sleep(5000);
String topic = "string_test_failover";
String msg1 = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
produceMsg2Kafka(topic, msg1);
-
String tableSql =
loadContent("src/test/resources/e2e/string_converter/string_msg_tab_failover.sql");
createTable(tableSql);
+ startCheck(topic);
+ }
+
+ /** mock streamload failure */
+ @Test
+ public void testStreamLoadFailoverSinkCombineFlush() throws Exception {
+ LOG.info("start to test testStreamLoadFailoverSinkCombineFlush.");
+ initialize(
+
"src/test/resources/e2e/string_converter/string_msg_failover_connector_uniq.json");
+ String topic = "string_test_failover_uniq";
+ String msg1 = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
+ produceMsg2Kafka(topic, msg1);
+ String tableSql =
+ loadContent(
+
"src/test/resources/e2e/string_converter/string_msg_tab_failover_uniq.sql");
+ createTable(tableSql);
+ startCheck(topic);
+ }
+ public void startCheck(String topic) throws Exception {
kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
String table = dorisOptions.getTopicMapTable(topic);
String querySql =
String.format("select id,name,age from %s.%s order by id",
database, table);
LOG.info("start to query result from doris. sql={}", querySql);
+ Connection jdbcConnection = getJdbcConnection();
while (true) {
- List<String> result = executeSQLStatement(getJdbcConnection(),
LOG, querySql, 3);
+ List<String> result = executeSQLStatement(jdbcConnection, LOG,
querySql, 3);
// until load success one time
if (result.size() >= 1) {
faultInjectionOpen();
@@ -111,7 +130,7 @@ public class DorisSinkFailoverSinkTest extends
AbstractStringE2ESinkTest {
faultInjectionClear();
break;
} else {
- Thread.sleep(100);
+ Thread.sleep(1000);
}
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 056f513..280ddf3 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -21,6 +21,7 @@ package
org.apache.doris.kafka.connector.e2e.sink.stringconverter;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -74,7 +75,7 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
}
private static void setTimeZone() {
- executeSql(getJdbcConnection(), "set global time_zone =
'Asia/Shanghai'");
+ executeSql("set global time_zone = 'Asia/Shanghai'");
}
@Test
@@ -88,23 +89,26 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
String tableSql =
loadContent("src/test/resources/e2e/string_converter/string_msg_tab.sql");
createTable(tableSql);
kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+ Thread.sleep(20000);
String table = dorisOptions.getTopicMapTable(topic);
- Statement statement = getJdbcConnection().createStatement();
- String querySql = "select * from " + database + "." + table;
- LOG.info("start to query result from doris. sql={}", querySql);
- ResultSet resultSet = statement.executeQuery(querySql);
-
- Assert.assertTrue(resultSet.next());
-
- int id = resultSet.getInt("id");
- String name = resultSet.getString("name");
- int age = resultSet.getInt("age");
- LOG.info("Query result is id={}, name={}, age={}", id, name, age);
-
- Assert.assertEquals(1, id);
- Assert.assertEquals("zhangsan", name);
- Assert.assertEquals(12, age);
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement(); ) {
+ String querySql = "select * from " + database + "." + table;
+ LOG.info("start to query result from doris. sql={}", querySql);
+ ResultSet resultSet = statement.executeQuery(querySql);
+
+ Assert.assertTrue(resultSet.next());
+
+ int id = resultSet.getInt("id");
+ String name = resultSet.getString("name");
+ int age = resultSet.getInt("age");
+ LOG.info("Query result is id={}, name={}, age={}", id, name, age);
+
+ Assert.assertEquals(1, id);
+ Assert.assertEquals("zhangsan", name);
+ Assert.assertEquals(12, age);
+ }
}
@Test
@@ -425,21 +429,23 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
String table = dorisOptions.getTopicMapTable(topic);
- Statement statement = getJdbcConnection().createStatement();
- String querySql = "select * from " + database + "." + table;
- LOG.info("start to query result from doris. sql={}", querySql);
- ResultSet resultSet = statement.executeQuery(querySql);
-
- Assert.assertTrue(resultSet.next());
-
- int id = resultSet.getInt("id");
- String name = resultSet.getString("name");
- int age = resultSet.getInt("age");
- LOG.info("Query result is id={}, name={}, age={}", id, name, age);
-
- Assert.assertEquals(1, id);
- Assert.assertEquals("zhangsan", name);
- Assert.assertEquals(12, age);
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ String querySql = "select * from " + database + "." + table;
+ LOG.info("start to query result from doris. sql={}", querySql);
+ ResultSet resultSet = statement.executeQuery(querySql);
+
+ Assert.assertTrue(resultSet.next());
+
+ int id = resultSet.getInt("id");
+ String name = resultSet.getString("name");
+ int age = resultSet.getInt("age");
+ LOG.info("Query result is id={}, name={}, age={}", id, name, age);
+
+ Assert.assertEquals(1, id);
+ Assert.assertEquals("zhangsan", name);
+ Assert.assertEquals(12, age);
+ }
}
@Test
@@ -456,21 +462,23 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
String table = dorisOptions.getTopicMapTable(topic);
- Statement statement = getJdbcConnection().createStatement();
- String querySql = "select * from " + database + "." + table;
- LOG.info("start to query result from doris. sql={}", querySql);
- ResultSet resultSet = statement.executeQuery(querySql);
-
- Assert.assertTrue(resultSet.next());
-
- int id = resultSet.getInt("id");
- String name = resultSet.getString("name");
- int age = resultSet.getInt("age");
- LOG.info("Query result is id={}, name={}, age={}", id, name, age);
-
- Assert.assertEquals(1, id);
- Assert.assertEquals("zhangsan", name);
- Assert.assertEquals(12, age);
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement(); ) {
+ String querySql = "select * from " + database + "." + table;
+ LOG.info("start to query result from doris. sql={}", querySql);
+ ResultSet resultSet = statement.executeQuery(querySql);
+
+ Assert.assertTrue(resultSet.next());
+
+ int id = resultSet.getInt("id");
+ String name = resultSet.getString("name");
+ int age = resultSet.getInt("age");
+ LOG.info("Query result is id={}, name={}, age={}", id, name, age);
+
+ Assert.assertEquals(1, id);
+ Assert.assertEquals("zhangsan", name);
+ Assert.assertEquals(12, age);
+ }
}
@AfterClass
diff --git
a/src/test/resources/e2e/string_converter/string_msg_failover_connector_uniq.json
b/src/test/resources/e2e/string_converter/string_msg_failover_connector_uniq.json
new file mode 100644
index 0000000..83cdc1c
--- /dev/null
+++
b/src/test/resources/e2e/string_converter/string_msg_failover_connector_uniq.json
@@ -0,0 +1,25 @@
+{
+ "name":"string_msg_failover_connector_uniq",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"string_test_failover_uniq",
+ "tasks.max":"1",
+ "doris.topic2table.map":
"string_test_failover_uniq:string_msg_tab_failover_uniq",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"1200",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"string_msg_failover",
+ "load.model":"stream_load",
+ "enable.2pc": "true",
+ "enable.combine.flush":"true",
+ "max.retries": "10",
+ "retry.interval.ms": "5000",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+ }
+}
\ No newline at end of file
diff --git
a/src/test/resources/e2e/string_converter/string_msg_tab_failover_uniq.sql
b/src/test/resources/e2e/string_converter/string_msg_tab_failover_uniq.sql
new file mode 100644
index 0000000..d1bad30
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/string_msg_tab_failover_uniq.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE string_msg_failover.string_msg_tab_failover_uniq (
+ id INT NULL,
+ name VARCHAR(100) NULL,
+ age INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]