This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f82b12b2c46 branch-4.0: [Fix](streamingjob) modify select backend 
policy for streaming job #59705 (#59731)
f82b12b2c46 is described below

commit f82b12b2c46cface6a4bf32e144fdc8257155373
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 12 09:28:43 2026 +0800

    branch-4.0: [Fix](streamingjob) modify select backend policy for streaming 
job #59705 (#59731)
    
    Cherry-picked from #59705
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingMultiTblTask.java    |  2 +-
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |  8 ++++----
 .../apache/doris/job/util/StreamingJobUtils.java   | 22 ++++++++++++++--------
 .../cdc/test_streaming_mysql_job.groovy            |  2 +-
 .../cdc/test_streaming_mysql_job_all_type.groovy   |  2 +-
 .../test_streaming_mysql_job_create_alter.groovy   |  2 +-
 .../cdc/test_streaming_mysql_job_dup.groovy        |  2 +-
 .../cdc/test_streaming_mysql_job_exclude.groovy    |  2 +-
 .../cdc/test_streaming_mysql_job_priv.groovy       |  2 +-
 .../cdc/test_streaming_mysql_job_restart_fe.groovy |  2 +-
 .../cdc/test_streaming_postgres_job.groovy         |  2 +-
 .../test_streaming_postgres_job_all_type.groovy    |  2 +-
 .../cdc/test_streaming_postgres_job_dup.groovy     |  2 +-
 .../cdc/test_streaming_postgres_job_priv.groovy    |  2 +-
 14 files changed, 30 insertions(+), 24 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 50bb0fd2acd..07d9acf9d3f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -112,7 +112,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     }
 
     private void sendWriteRequest() throws JobException {
-        Backend backend = StreamingJobUtils.selectBackend(jobId);
+        Backend backend = StreamingJobUtils.selectBackend();
         WriteRecordRequest params = buildRequestParams();
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/writeRecords")
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 2c898b04a07..560887d61ad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -183,7 +183,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     @Override
     public void fetchRemoteMeta(Map<String, String> properties) throws 
Exception {
-        Backend backend = StreamingJobUtils.selectBackend(jobId);
+        Backend backend = StreamingJobUtils.selectBackend();
         JobBaseConfig requestParams = new JobBaseConfig(getJobId(), 
sourceType.name(), sourceProperties);
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/fetchEndOffset")
@@ -258,7 +258,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     private boolean compareOffset(Map<String, String> offsetFirst, Map<String, 
String> offsetSecond)
             throws JobException {
-        Backend backend = StreamingJobUtils.selectBackend(jobId);
+        Backend backend = StreamingJobUtils.selectBackend();
         CompareOffsetRequest requestParams =
                 new CompareOffsetRequest(getJobId(), sourceType.name(), 
sourceProperties, offsetFirst, offsetSecond);
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -444,7 +444,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     private List<SnapshotSplit> requestTableSplits(String table) throws 
JobException {
-        Backend backend = StreamingJobUtils.selectBackend(jobId);
+        Backend backend = StreamingJobUtils.selectBackend();
         FetchTableSplitsRequest requestParams =
                 new FetchTableSplitsRequest(getJobId(), sourceType.name(), 
sourceProperties, table);
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -493,7 +493,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     public void cleanMeta(Long jobId) throws JobException {
         // clean meta table
         StreamingJobUtils.deleteJobMeta(jobId);
-        Backend backend = StreamingJobUtils.selectBackend(jobId);
+        Backend backend = StreamingJobUtils.selectBackend();
         JobBaseConfig requestParams = new JobBaseConfig(getJobId(), 
sourceType.name(), sourceProperties);
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/close")
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 0281503448c..4625417b67d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -97,6 +97,8 @@ public class StreamingJobUtils {
 
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
+    private static int lastSelectedBackendIndex = 0;
+
     public static void createMetaTableIfNotExist() throws Exception {
         Optional<Database> optionalDatabase =
                 Env.getCurrentEnv().getInternalCatalog()
@@ -213,27 +215,31 @@ public class StreamingJobUtils {
         return JdbcClient.createJdbcClient(config);
     }
 
-    public static Backend selectBackend(Long jobId) throws JobException {
+    public static Backend selectBackend() throws JobException {
         Backend backend = null;
         BeSelectionPolicy policy = null;
 
-        policy = new BeSelectionPolicy.Builder()
-                .setEnableRoundRobin(true)
-                .needLoadAvailable().build();
+        policy = new 
BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build();
+        policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
+
         List<Long> backendIds;
-        backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
+        backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
         if (backendIds.isEmpty()) {
             throw new 
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
-        // jobid % backendSize
-        long index = backendIds.get(jobId.intValue() % backendIds.size());
-        backend = Env.getCurrentSystemInfo().getBackend(index);
+        backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
         if (backend == null) {
             throw new 
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
         return backend;
     }
 
+    private static synchronized int getLastSelectedBackendIndexAndUpdate() {
+        int index = lastSelectedBackendIndex;
+        lastSelectedBackendIndex = (index >= Integer.MAX_VALUE - 1) ? 0 : 
index + 1;
+        return index;
+    }
+
     public static List<CreateTableCommand> generateCreateTableCmds(String 
targetDb, DataSourceType sourceType,
             Map<String, String> properties, Map<String, String> 
targetProperties)
             throws JobException {
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
index fa3065849fc..d77e2b769bb 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
     def jobName = "test_streaming_mysql_job_name"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "user_info_normal1"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy
index e24e93b6177..564f4d6c4ab 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_mysql_job_all_type", 
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_all_type", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
     def jobName = "test_streaming_mysql_job_all_type_name"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "streaming_all_types_nullable_with_pk"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
index 5b210a2fd74..2a40d2f48bc 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_mysql_job_create_alter", 
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_create_alter", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
     def jobName = "test_streaming_mysql_job_create_alter"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "create_alter_user_info"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
index dba24d884c5..ecfd4a36cf3 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_streaming_mysql_job_dup", 
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_dup", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
     def jobName = "test_streaming_mysql_job_name"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "test_streaming_mysql_job_dup"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy
index 27553e0c0a5..b4b73536cce 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_mysql_job_exclude", 
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_exclude", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
     def jobName = "test_streaming_mysql_job_exclude_name"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "user_info_exclude1"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
index aae1bc58134..b6a0926265a 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
@@ -19,7 +19,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
     def tableName = "test_streaming_mysql_job_priv_tbl"
     def jobName = "test_streaming_mysql_job_priv_name"
 
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
index eb0e4866143..8ece9f4ba74 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_mysql_job_restart_fe", 
"docker,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_restart_fe", 
"docker,mysql,external_docker,external_docker_mysql,nondatalake") {
     def jobName = "test_streaming_mysql_job_restart_fe"
     def options = new ClusterOptions()
     options.setFeNum(1)
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index 2a82b3a5777..7fe8cb73daa 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_docker_pg") {
+suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
     def jobName = "test_streaming_postgres_job_name"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "user_info_pg_normal1"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
index dcd688bb94f..541941b816c 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,external_docker_pg") {
+suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
     def jobName = "test_streaming_postgres_job_all_type_name"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "streaming_all_types_nullable_with_pk_pg"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy
index cae745d06e6..1bec6cd3a25 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_streaming_postgres_job_dup", 
"p0,external,pg,external_docker,external_docker_pg") {
+suite("test_streaming_postgres_job_dup", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
     def jobName = "test_streaming_postgres_job_dup_name"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "test_streaming_postgres_job_dup"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 6b70301e43d..7b114b2ca97 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -19,7 +19,7 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,external_docker_pg") {
+suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
     def tableName = "test_streaming_postgres_job_priv_tbl"
     def jobName = "test_streaming_postgres_job_priv_name"
     def currentDb = (sql "select database()")[0][0]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to