This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a57f24b6 [improve] optimize the batch stream load redirect logic (#437)
a57f24b6 is described below
commit a57f24b67cc8957851e293761a0bd3407e631339
Author: Petrichor <[email protected]>
AuthorDate: Tue Jul 23 11:28:38 2024 +0800
[improve] optimize the batch stream load redirect logic (#437)
---
.../doris/flink/sink/batch/DorisBatchStreamLoad.java | 8 +++++---
.../apache/doris/flink/sink/batch/DorisBatchWriter.java | 8 +++++++-
.../doris/flink/sink/batch/TestDorisBatchStreamLoad.java | 14 +++++++++++---
3 files changed, 23 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index c5614c31..2dd7a50e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -102,12 +102,14 @@ public class DorisBatchStreamLoad implements Serializable
{
private BackendUtil backendUtil;
private boolean enableGroupCommit;
private boolean enableGzCompress;
+ private int subTaskId;
public DorisBatchStreamLoad(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
- LabelGenerator labelGenerator) {
+ LabelGenerator labelGenerator,
+ int subTaskId) {
this.backendUtil =
StringUtils.isNotEmpty(dorisOptions.getBenodes())
? new BackendUtil(dorisOptions.getBenodes())
@@ -154,6 +156,7 @@ public class DorisBatchStreamLoad implements Serializable {
new ThreadPoolExecutor.AbortPolicy());
this.started = new AtomicBoolean(true);
this.loadExecutorService.execute(loadAsyncExecutor);
+ this.subTaskId = subTaskId;
}
/**
@@ -293,7 +296,6 @@ public class DorisBatchStreamLoad implements Serializable {
if (enableGzCompress) {
putBuilder.setEntity(new GzipCompressingEntity(entity));
}
-
Throwable resEx = new Throwable();
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
@@ -351,7 +353,7 @@ public class DorisBatchStreamLoad implements Serializable {
}
private void refreshLoadUrl(String database, String table) {
- hostPort = backendUtil.getAvailableBackend();
+ hostPort = backendUtil.getAvailableBackend(subTaskId);
loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database,
table);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 4b48436c..6fbde55d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -59,6 +59,7 @@ public class DorisBatchWriter<IN>
private transient volatile Exception flushException = null;
private String database;
private String table;
+ private int subtaskId;
public DorisBatchWriter(
Sink.InitContext initContext,
@@ -75,6 +76,7 @@ public class DorisBatchWriter<IN>
this.table = tableInfo[1];
}
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
+ this.subtaskId = initContext.getSubtaskId();
this.labelPrefix = executionOptions.getLabelPrefix() + "_" +
initContext.getSubtaskId();
this.labelGenerator = new LabelGenerator(labelPrefix, false);
this.scheduledExecutorService =
@@ -92,7 +94,11 @@ public class DorisBatchWriter<IN>
public void initializeLoad() {
this.batchStreamLoad =
new DorisBatchStreamLoad(
- dorisOptions, dorisReadOptions, executionOptions,
labelGenerator);
+ dorisOptions,
+ dorisReadOptions,
+ executionOptions,
+ labelGenerator,
+ subtaskId);
// when uploading data in streaming mode, we need to regularly detect
whether there are
// exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index 6080db2d..d52149d6 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -79,7 +79,7 @@ public class TestDorisBatchStreamLoad {
thrown.expectMessage("tableIdentifier input error");
DorisBatchStreamLoad loader =
new DorisBatchStreamLoad(
- options, readOptions, executionOptions, new
LabelGenerator("xx", false));
+ options, readOptions, executionOptions, new
LabelGenerator("xx", false), 0);
}
@Test
@@ -95,7 +95,11 @@ public class TestDorisBatchStreamLoad {
DorisBatchStreamLoad loader =
new DorisBatchStreamLoad(
- options, readOptions, executionOptions, new
LabelGenerator("label", false));
+ options,
+ readOptions,
+ executionOptions,
+ new LabelGenerator("label", false),
+ 0);
TestUtil.waitUntilCondition(
() -> loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(10)),
@@ -134,7 +138,11 @@ public class TestDorisBatchStreamLoad {
DorisBatchStreamLoad loader =
new DorisBatchStreamLoad(
- options, readOptions, executionOptions, new
LabelGenerator("label", false));
+ options,
+ readOptions,
+ executionOptions,
+ new LabelGenerator("label", false),
+ 0);
TestUtil.waitUntilCondition(
() -> loader.isLoadThreadAlive(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]