This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a57f24b6 [improve] optimize the batch stream load redirect logic (#437)
a57f24b6 is described below

commit a57f24b67cc8957851e293761a0bd3407e631339
Author: Petrichor <[email protected]>
AuthorDate: Tue Jul 23 11:28:38 2024 +0800

    [improve] optimize the batch stream load redirect logic (#437)
---
 .../doris/flink/sink/batch/DorisBatchStreamLoad.java       |  8 +++++---
 .../apache/doris/flink/sink/batch/DorisBatchWriter.java    |  8 +++++++-
 .../doris/flink/sink/batch/TestDorisBatchStreamLoad.java   | 14 +++++++++++---
 3 files changed, 23 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index c5614c31..2dd7a50e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -102,12 +102,14 @@ public class DorisBatchStreamLoad implements Serializable 
{
     private BackendUtil backendUtil;
     private boolean enableGroupCommit;
     private boolean enableGzCompress;
+    private int subTaskId;
 
     public DorisBatchStreamLoad(
             DorisOptions dorisOptions,
             DorisReadOptions dorisReadOptions,
             DorisExecutionOptions executionOptions,
-            LabelGenerator labelGenerator) {
+            LabelGenerator labelGenerator,
+            int subTaskId) {
         this.backendUtil =
                 StringUtils.isNotEmpty(dorisOptions.getBenodes())
                         ? new BackendUtil(dorisOptions.getBenodes())
@@ -154,6 +156,7 @@ public class DorisBatchStreamLoad implements Serializable {
                         new ThreadPoolExecutor.AbortPolicy());
         this.started = new AtomicBoolean(true);
         this.loadExecutorService.execute(loadAsyncExecutor);
+        this.subTaskId = subTaskId;
     }
 
     /**
@@ -293,7 +296,6 @@ public class DorisBatchStreamLoad implements Serializable {
             if (enableGzCompress) {
                 putBuilder.setEntity(new GzipCompressingEntity(entity));
             }
-
             Throwable resEx = new Throwable();
             int retry = 0;
             while (retry <= executionOptions.getMaxRetries()) {
@@ -351,7 +353,7 @@ public class DorisBatchStreamLoad implements Serializable {
         }
 
         private void refreshLoadUrl(String database, String table) {
-            hostPort = backendUtil.getAvailableBackend();
+            hostPort = backendUtil.getAvailableBackend(subTaskId);
             loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, 
table);
         }
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 4b48436c..6fbde55d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -59,6 +59,7 @@ public class DorisBatchWriter<IN>
     private transient volatile Exception flushException = null;
     private String database;
     private String table;
+    private int subtaskId;
 
     public DorisBatchWriter(
             Sink.InitContext initContext,
@@ -75,6 +76,7 @@ public class DorisBatchWriter<IN>
             this.table = tableInfo[1];
         }
         LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
+        this.subtaskId = initContext.getSubtaskId();
         this.labelPrefix = executionOptions.getLabelPrefix() + "_" + 
initContext.getSubtaskId();
         this.labelGenerator = new LabelGenerator(labelPrefix, false);
         this.scheduledExecutorService =
@@ -92,7 +94,11 @@ public class DorisBatchWriter<IN>
     public void initializeLoad() {
         this.batchStreamLoad =
                 new DorisBatchStreamLoad(
-                        dorisOptions, dorisReadOptions, executionOptions, 
labelGenerator);
+                        dorisOptions,
+                        dorisReadOptions,
+                        executionOptions,
+                        labelGenerator,
+                        subtaskId);
         // when uploading data in streaming mode, we need to regularly detect 
whether there are
         // exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index 6080db2d..d52149d6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -79,7 +79,7 @@ public class TestDorisBatchStreamLoad {
         thrown.expectMessage("tableIdentifier input error");
         DorisBatchStreamLoad loader =
                 new DorisBatchStreamLoad(
-                        options, readOptions, executionOptions, new 
LabelGenerator("xx", false));
+                        options, readOptions, executionOptions, new 
LabelGenerator("xx", false), 0);
     }
 
     @Test
@@ -95,7 +95,11 @@ public class TestDorisBatchStreamLoad {
 
         DorisBatchStreamLoad loader =
                 new DorisBatchStreamLoad(
-                        options, readOptions, executionOptions, new 
LabelGenerator("label", false));
+                        options,
+                        readOptions,
+                        executionOptions,
+                        new LabelGenerator("label", false),
+                        0);
         TestUtil.waitUntilCondition(
                 () -> loader.isLoadThreadAlive(),
                 Deadline.fromNow(Duration.ofSeconds(10)),
@@ -134,7 +138,11 @@ public class TestDorisBatchStreamLoad {
 
         DorisBatchStreamLoad loader =
                 new DorisBatchStreamLoad(
-                        options, readOptions, executionOptions, new 
LabelGenerator("label", false));
+                        options,
+                        readOptions,
+                        executionOptions,
+                        new LabelGenerator("label", false),
+                        0);
 
         TestUtil.waitUntilCondition(
                 () -> loader.isLoadThreadAlive(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to