This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d9f52e6 [optimize](sink) Optimize the BE load balancing logic during
concurrent imports. (#388)
d9f52e6 is described below
commit d9f52e6774a2aafa1c1274d2048afdae86f0e39e
Author: Petrichor <[email protected]>
AuthorDate: Tue May 21 17:35:22 2024 +0800
[optimize](sink) Optimize the BE load balancing logic during concurrent
imports. (#388)
---
.../src/main/java/org/apache/doris/flink/sink/BackendUtil.java | 8 +++++++-
.../main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 6 +++---
.../test/java/org/apache/doris/flink/sink/TestBackendUtil.java | 4 +---
3 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index cb5b6f2..9296ae5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -76,9 +76,15 @@ public class BackendUtil {
}
public String getAvailableBackend() {
+ return getAvailableBackend(0);
+ }
+
+ public String getAvailableBackend(int subtaskId) {
long tmp = pos + backends.size();
while (pos < tmp) {
- BackendV2.BackendRowV2 backend = backends.get((int) (pos++ %
backends.size()));
+ BackendV2.BackendRowV2 backend =
+ backends.get((int) ((pos + subtaskId) % backends.size()));
+ pos++;
String res = backend.toBackendString();
if (tryHttpConnection(res)) {
return res;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 08edfc7..81bfe88 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -312,7 +312,7 @@ public class DorisWriter<IN>
List<DorisWriterState> writerStates = new ArrayList<>();
for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) {
// Dynamic refresh backend
- dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
DorisWriterState writerState =
new DorisWriterState(
labelPrefix,
@@ -340,7 +340,7 @@ public class DorisWriter<IN>
tableKey,
v ->
new DorisStreamLoad(
- backendUtil.getAvailableBackend(),
+ backendUtil.getAvailableBackend(subtaskId),
dorisOptions,
executionOptions,
labelGenerator,
@@ -373,7 +373,7 @@ public class DorisWriter<IN>
// use send cached data to new txn, then notify to restart the
stream
if (executionOptions.isUseCache()) {
try {
-
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
if (executionOptions.enabled2PC()) {
dorisStreamLoad.abortPreCommit(labelPrefix,
curCheckpointId);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
index 8a780ff..ece59c7 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
@@ -22,7 +22,6 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -45,8 +44,7 @@ public class TestBackendUtil {
@Test
public void testTryHttpConnection() {
- BackendUtil backendUtil = new BackendUtil(new ArrayList<>());
- boolean flag = backendUtil.tryHttpConnection("127.0.0.1:8040");
+ boolean flag = BackendUtil.tryHttpConnection("127.0.0.1:8040");
Assert.assertFalse(flag);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]