This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f82b12b2c46 branch-4.0: [Fix](streamingjob) modify select backend
policy for streaming job #59705 (#59731)
f82b12b2c46 is described below
commit f82b12b2c46cface6a4bf32e144fdc8257155373
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 12 09:28:43 2026 +0800
branch-4.0: [Fix](streamingjob) modify select backend policy for streaming
job #59705 (#59731)
Cherry-picked from #59705
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingMultiTblTask.java | 2 +-
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 8 ++++----
.../apache/doris/job/util/StreamingJobUtils.java | 22 ++++++++++++++--------
.../cdc/test_streaming_mysql_job.groovy | 2 +-
.../cdc/test_streaming_mysql_job_all_type.groovy | 2 +-
.../test_streaming_mysql_job_create_alter.groovy | 2 +-
.../cdc/test_streaming_mysql_job_dup.groovy | 2 +-
.../cdc/test_streaming_mysql_job_exclude.groovy | 2 +-
.../cdc/test_streaming_mysql_job_priv.groovy | 2 +-
.../cdc/test_streaming_mysql_job_restart_fe.groovy | 2 +-
.../cdc/test_streaming_postgres_job.groovy | 2 +-
.../test_streaming_postgres_job_all_type.groovy | 2 +-
.../cdc/test_streaming_postgres_job_dup.groovy | 2 +-
.../cdc/test_streaming_postgres_job_priv.groovy | 2 +-
14 files changed, 30 insertions(+), 24 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 50bb0fd2acd..07d9acf9d3f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -112,7 +112,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
}
private void sendWriteRequest() throws JobException {
- Backend backend = StreamingJobUtils.selectBackend(jobId);
+ Backend backend = StreamingJobUtils.selectBackend();
WriteRecordRequest params = buildRequestParams();
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/writeRecords")
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 2c898b04a07..560887d61ad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -183,7 +183,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
@Override
public void fetchRemoteMeta(Map<String, String> properties) throws
Exception {
- Backend backend = StreamingJobUtils.selectBackend(jobId);
+ Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams = new JobBaseConfig(getJobId(),
sourceType.name(), sourceProperties);
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchEndOffset")
@@ -258,7 +258,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
private boolean compareOffset(Map<String, String> offsetFirst, Map<String,
String> offsetSecond)
throws JobException {
- Backend backend = StreamingJobUtils.selectBackend(jobId);
+ Backend backend = StreamingJobUtils.selectBackend();
CompareOffsetRequest requestParams =
new CompareOffsetRequest(getJobId(), sourceType.name(),
sourceProperties, offsetFirst, offsetSecond);
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -444,7 +444,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
}
private List<SnapshotSplit> requestTableSplits(String table) throws
JobException {
- Backend backend = StreamingJobUtils.selectBackend(jobId);
+ Backend backend = StreamingJobUtils.selectBackend();
FetchTableSplitsRequest requestParams =
new FetchTableSplitsRequest(getJobId(), sourceType.name(),
sourceProperties, table);
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -493,7 +493,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
- Backend backend = StreamingJobUtils.selectBackend(jobId);
+ Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams = new JobBaseConfig(getJobId(),
sourceType.name(), sourceProperties);
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/close")
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 0281503448c..4625417b67d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -97,6 +97,8 @@ public class StreamingJobUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static int lastSelectedBackendIndex = 0;
+
public static void createMetaTableIfNotExist() throws Exception {
Optional<Database> optionalDatabase =
Env.getCurrentEnv().getInternalCatalog()
@@ -213,27 +215,31 @@ public class StreamingJobUtils {
return JdbcClient.createJdbcClient(config);
}
- public static Backend selectBackend(Long jobId) throws JobException {
+ public static Backend selectBackend() throws JobException {
Backend backend = null;
BeSelectionPolicy policy = null;
- policy = new BeSelectionPolicy.Builder()
- .setEnableRoundRobin(true)
- .needLoadAvailable().build();
+ policy = new
BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build();
+ policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
+
List<Long> backendIds;
- backendIds =
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
+ backendIds =
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " +
policy);
}
- // jobid % backendSize
- long index = backendIds.get(jobId.intValue() % backendIds.size());
- backend = Env.getCurrentSystemInfo().getBackend(index);
+ backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
throw new
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " +
policy);
}
return backend;
}
+ private static synchronized int getLastSelectedBackendIndexAndUpdate() {
+ int index = lastSelectedBackendIndex;
+ lastSelectedBackendIndex = (index >= Integer.MAX_VALUE - 1) ? 0 :
index + 1;
+ return index;
+ }
+
public static List<CreateTableCommand> generateCreateTableCmds(String
targetDb, DataSourceType sourceType,
Map<String, String> properties, Map<String, String>
targetProperties)
throws JobException {
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
index fa3065849fc..d77e2b769bb 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_normal1"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy
index e24e93b6177..564f4d6c4ab 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_mysql_job_all_type",
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_all_type",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_all_type_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "streaming_all_types_nullable_with_pk"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
index 5b210a2fd74..2a40d2f48bc 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_mysql_job_create_alter",
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_create_alter",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_create_alter"
def currentDb = (sql "select database()")[0][0]
def table1 = "create_alter_user_info"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
index dba24d884c5..ecfd4a36cf3 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_streaming_mysql_job_dup",
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_dup",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "test_streaming_mysql_job_dup"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy
index 27553e0c0a5..b4b73536cce 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_mysql_job_exclude",
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_exclude",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_exclude_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_exclude1"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
index aae1bc58134..b6a0926265a 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
@@ -19,7 +19,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_mysql_job_priv",
"p0,external,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_priv",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def tableName = "test_streaming_mysql_job_priv_tbl"
def jobName = "test_streaming_mysql_job_priv_name"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
index eb0e4866143..8ece9f4ba74 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_mysql_job_restart_fe",
"docker,mysql,external_docker,external_docker_mysql") {
+suite("test_streaming_mysql_job_restart_fe",
"docker,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_restart_fe"
def options = new ClusterOptions()
options.setFeNum(1)
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index 2a82b3a5777..7fe8cb73daa 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_docker_pg") {
+suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def jobName = "test_streaming_postgres_job_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_pg_normal1"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
index dcd688bb94f..541941b816c 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,external_docker_pg") {
+suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def jobName = "test_streaming_postgres_job_all_type_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "streaming_all_types_nullable_with_pk_pg"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy
index cae745d06e6..1bec6cd3a25 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_streaming_postgres_job_dup",
"p0,external,pg,external_docker,external_docker_pg") {
+suite("test_streaming_postgres_job_dup",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def jobName = "test_streaming_postgres_job_dup_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "test_streaming_postgres_job_dup"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 6b70301e43d..7b114b2ca97 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -19,7 +19,7 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,external_docker_pg") {
+suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def tableName = "test_streaming_postgres_job_priv_tbl"
def jobName = "test_streaming_postgres_job_priv_name"
def currentDb = (sql "select database()")[0][0]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]