This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 953250c4669 branch-4.1: [improve](streaming-job) support specifying
compute_group for StreamingJob #62747 (#62990)
953250c4669 is described below
commit 953250c46697b1134ee91a51f2c2c06aa11ed0a6
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 6 10:43:58 2026 +0800
branch-4.1: [improve](streaming-job) support specifying compute_group for
StreamingJob #62747 (#62990)
Cherry-picked from #62747
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingInsertJob.java | 90 +++++++++++-
.../insert/streaming/StreamingInsertTask.java | 11 +-
.../insert/streaming/StreamingJobProperties.java | 7 +-
.../insert/streaming/StreamingMultiTblTask.java | 7 +-
.../doris/job/offset/SourceOffsetProvider.java | 6 +
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 12 +-
.../apache/doris/job/util/StreamingJobUtils.java | 28 +++-
.../test_streaming_mysql_job_compute_group.groovy | 127 ++++++++++++++++
.../test_streaming_insert_job_compute_group.groovy | 158 ++++++++++++++++++++
...treaming_insert_job_compute_group_docker.groovy | 163 +++++++++++++++++++++
10 files changed, 584 insertions(+), 25 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 168bb137259..4583271a64b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -20,10 +20,13 @@ package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.InternalErrorCode;
@@ -167,6 +170,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
// Converted form of sourceProperties; must be refreshed whenever
sourceProperties changes.
private transient Map<String, String> convertedSourceProperties;
+ @Getter
+ @SerializedName("ccn")
+ private volatile String cloudCluster;
+
// The sampling window starts at the beginning of the sampling window.
// If the error rate exceeds `max_filter_ratio` within the window, the
sampling fails.
@Setter
@@ -238,8 +245,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
StreamingJobUtils.resolveAndValidateSource(
dataSourceType, sourceProperties,
String.valueOf(getJobId()), createTbls);
- this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(),
dataSourceType,
- getConvertedSourceProperties());
+ this.offsetProvider =
createOffsetProvider(getConvertedSourceProperties());
JdbcSourceOffsetProvider rdsOffsetProvider =
(JdbcSourceOffsetProvider) this.offsetProvider;
rdsOffsetProvider.splitChunks(createTbls);
} catch (Exception ex) {
@@ -292,6 +298,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.jobProperties = new StreamingJobProperties(properties);
jobProperties.validate();
this.sampleWindowMs = jobProperties.getMaxIntervalSecond() * 10 *
1000;
+ resolveCloudCluster();
// build time definition
JobExecutionConfiguration execConfig = getJobConfig();
TimerDefinition timerDefinition = new TimerDefinition();
@@ -305,13 +312,75 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ private void resolveCloudCluster() throws AnalysisException {
+ String requested = validateComputeGroupProperty(properties);
+ if (requested != null) {
+ this.cloudCluster = requested;
+ return;
+ }
+ if (!Config.isCloudMode()) {
+ return;
+ }
+ if (ConnectContext.get() == null) {
+ throw new AnalysisException("compute_group must be specified when
no active session is available");
+ }
+ String sessionCluster;
+ try {
+ sessionCluster = ConnectContext.get().getCloudCluster();
+ } catch (ComputeGroupException e) {
+ throw new AnalysisException("failed to resolve compute_group: " +
e.getMessage());
+ }
+ if (StringUtils.isBlank(sessionCluster)) {
+ throw new AnalysisException("compute_group is required in cloud
mode; "
+ + "specify compute_group explicitly or bind a default
cluster with USAGE");
+ }
+ try {
+ ((CloudEnv)
Env.getCurrentEnv()).checkCloudClusterPriv(sessionCluster);
+ } catch (DdlException e) {
+ throw new AnalysisException(e.getMessage());
+ }
+ this.cloudCluster = sessionCluster;
+ }
+
+ // returns the validated compute_group value, or null when the property is
absent.
+ // throws if the key is present but blank, non-cloud mode, or the user
lacks USAGE on the cluster.
+ private String validateComputeGroupProperty(Map<String, String> props)
throws AnalysisException {
+ if (props == null ||
!props.containsKey(StreamingJobProperties.COMPUTE_GROUP_PROPERTY)) {
+ return null;
+ }
+ String value =
props.get(StreamingJobProperties.COMPUTE_GROUP_PROPERTY);
+ if (StringUtils.isBlank(value)) {
+ throw new AnalysisException("compute_group cannot be empty");
+ }
+ if (!Config.isCloudMode()) {
+ throw new AnalysisException("compute_group is only supported in
cloud mode");
+ }
+ try {
+ ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(value);
+ } catch (DdlException e) {
+ throw new AnalysisException(e.getMessage());
+ }
+ return value;
+ }
+
+ private SourceOffsetProvider createOffsetProvider(Map<String, String>
jdbcSourceProps) {
+ SourceOffsetProvider provider;
+ if (tvfType != null) {
+ provider =
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
+ } else {
+ provider = new JdbcSourceOffsetProvider(getJobId(),
dataSourceType, jdbcSourceProps);
+ }
+ provider.setCloudCluster(this.cloudCluster);
+ return provider;
+ }
+
private void initInsertJob() {
try {
init();
UnboundTVFRelation currentTvf = getCurrentTvf();
this.tvfType = currentTvf.getFunctionName();
this.originTvfProps = currentTvf.getProperties().getMap();
- this.offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
+ this.offsetProvider = createOffsetProvider(sourceProperties);
this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
// Validate source-side resources (e.g. PG slot/publication
ownership) once at job
// creation so conflicts fail fast. No-op for standalone
cdc_stream TVF (no job).
@@ -411,6 +480,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
logParts.add("sql: " + encryptedSql);
}
+ validateComputeGroupProperty(alterJobCommand.getProperties());
+
// update properties
if (!alterJobCommand.getProperties().isEmpty()) {
modifyPropertiesInternal(alterJobCommand.getProperties());
@@ -547,7 +618,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
private AbstractStreamingTask createStreamingMultiTblTask() throws
JobException {
return new StreamingMultiTblTask(getJobId(),
Env.getCurrentEnv().getNextId(), dataSourceType,
offsetProvider, getConvertedSourceProperties(), targetDb,
targetProperties, jobProperties,
- getCreateUser());
+ getCreateUser(), cloudCluster);
}
private Map<String, String> getConvertedSourceProperties() throws
JobException {
@@ -571,7 +642,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
protected AbstractStreamingTask createStreamingInsertTask() {
return new StreamingInsertTask(getJobId(),
Env.getCurrentEnv().getNextId(),
getExecuteSql(),
- offsetProvider, getCurrentDbName(), jobProperties,
getOriginTvfProps(), getCreateUser());
+ offsetProvider, getCurrentDbName(), jobProperties,
getOriginTvfProps(),
+ getCreateUser(), cloudCluster);
}
public void recordTasks(AbstractStreamingTask task) {
@@ -856,6 +928,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
resetCloudProgress(offset);
}
}
+ if
(inputProperties.containsKey(StreamingJobProperties.COMPUTE_GROUP_PROPERTY)) {
+ this.cloudCluster =
inputProperties.get(StreamingJobProperties.COMPUTE_GROUP_PROPERTY);
+ offsetProvider.setCloudCluster(this.cloudCluster);
+ }
this.properties.putAll(inputProperties);
this.jobProperties = new StreamingJobProperties(this.properties);
}
@@ -1227,11 +1303,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void gsonPostProcess() throws IOException {
if (offsetProvider == null) {
+ offsetProvider = createOffsetProvider(sourceProperties);
if (tvfType != null) {
- offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
offsetProvider.restoreFromPersistInfo(offsetProviderPersist);
- } else {
- offsetProvider = new JdbcSourceOffsetProvider(getJobId(),
dataSourceType, sourceProperties);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 54832eb6fb1..df23f107240 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -19,6 +19,7 @@ package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.Util;
@@ -41,6 +42,7 @@ import org.apache.doris.thrift.TStatusCode;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Collections;
@@ -58,6 +60,7 @@ public class StreamingInsertTask extends
AbstractStreamingTask {
private ConnectContext ctx;
private StreamingJobProperties jobProperties;
private Map<String, String> originTvfProps;
+ private String cloudCluster;
SourceOffsetProvider offsetProvider;
public StreamingInsertTask(long jobId,
@@ -67,13 +70,15 @@ public class StreamingInsertTask extends
AbstractStreamingTask {
String currentDb,
StreamingJobProperties jobProperties,
Map<String, String> originTvfProps,
- UserIdentity userIdentity) {
+ UserIdentity userIdentity,
+ String cloudCluster) {
super(jobId, taskId, userIdentity);
this.sql = sql;
this.currentDb = currentDb;
this.offsetProvider = offsetProvider;
this.jobProperties = jobProperties;
this.originTvfProps = originTvfProps;
+ this.cloudCluster = cloudCluster;
}
@Override
@@ -86,6 +91,10 @@ public class StreamingInsertTask extends
AbstractStreamingTask {
this.startTimeMs = System.currentTimeMillis();
ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
ctx.setSessionVariable(jobProperties.getSessionVariable(ctx.getSessionVariable()));
+ // apply after session merge so compute_group wins over
session.cloud_cluster
+ if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
+ ctx.setCloudCluster(cloudCluster);
+ }
StatementContext statementContext = new StatementContext();
ctx.setStatementContext(statementContext);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index c7a814c4a5b..3db3aecaf8b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -44,8 +44,9 @@ public class StreamingJobProperties implements JobProperties {
public static final String SESSION_VAR_PREFIX = "session.";
public static final String INTERNAL_KEY_PREFIX = "__";
public static final String OFFSET_PROPERTY = "offset";
+ public static final String COMPUTE_GROUP_PROPERTY = "compute_group";
public static final List<String> SUPPORT_STREAM_JOB_PROPS =
Arrays.asList(MAX_INTERVAL_SECOND_PROPERTY,
- S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY,
OFFSET_PROPERTY);
+ S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY,
OFFSET_PROPERTY, COMPUTE_GROUP_PROPERTY);
public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
@@ -195,4 +196,8 @@ public class StreamingJobProperties implements
JobProperties {
public String getOffsetProperty() {
return properties.get(OFFSET_PROPERTY);
}
+
+ public String getComputeGroup() {
+ return properties.get(COMPUTE_GROUP_PROPERTY);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index b1d28b2dcad..ac5efe51542 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -75,6 +75,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
private Map<String, String> targetProperties;
private String targetDb;
private StreamingJobProperties jobProperties;
+ private String cloudCluster;
private long scannedRows = 0L;
private long loadBytes = 0L;
private long filteredRows = 0L;
@@ -90,7 +91,8 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
String targetDb,
Map<String, String> targetProperties,
StreamingJobProperties jobProperties,
- UserIdentity userIdentity) {
+ UserIdentity userIdentity,
+ String cloudCluster) {
super(jobId, taskId, userIdentity);
this.dataSourceType = dataSourceType;
this.offsetProvider = offsetProvider;
@@ -98,6 +100,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
this.targetProperties = targetProperties;
this.jobProperties = jobProperties;
this.targetDb = targetDb;
+ this.cloudCluster = cloudCluster;
this.timeoutMs = Config.streaming_task_timeout_multiplier *
jobProperties.getMaxIntervalSecond() * 1000L;
}
@@ -123,7 +126,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
}
private void sendWriteRequest() throws JobException {
- Backend backend = StreamingJobUtils.selectBackend();
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
log.info("start to run streaming multi task {} in backend {}/{},
offset is {}",
taskId, backend.getId(), backend.getHost(),
runningOffset.toString());
this.runningBackendId = backend.getId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 49c718c5950..87eca253c46 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -88,6 +88,12 @@ public interface SourceOffsetProvider {
*/
void updateOffset(Offset offset);
+ /**
+ * Bind the compute group that should route FE-initiated RPCs.
+ * Default: no-op for providers that do not make BE RPCs.
+ */
+ default void setCloudCluster(String cloudCluster) {}
+
/**
* Fetch remote meta information, such as listing files in S3 or getting
latest offsets in Kafka.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index cfa2fe3273b..3209b190538 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -94,6 +94,8 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
volatile boolean hasMoreData = true;
+ transient volatile String cloudCluster;
+
/**
* No-arg constructor for subclass use.
*/
@@ -220,7 +222,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
@Override
public void fetchRemoteMeta(Map<String, String> properties) throws
Exception {
- Backend backend = StreamingJobUtils.selectBackend();
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -306,7 +308,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
private boolean compareOffset(Map<String, String> offsetFirst, Map<String,
String> offsetSecond)
throws JobException {
- Backend backend = StreamingJobUtils.selectBackend();
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
CompareOffsetRequest requestParams =
new CompareOffsetRequest(getJobId(), sourceType.name(),
sourceProperties,
getFrontendAddress(), offsetFirst, offsetSecond);
@@ -549,7 +551,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
}
private List<SnapshotSplit> requestTableSplits(String table) throws
JobException {
- Backend backend = StreamingJobUtils.selectBackend();
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
FetchTableSplitsRequest requestParams =
new FetchTableSplitsRequest(getJobId(), sourceType.name(),
sourceProperties, getFrontendAddress(), table);
@@ -664,7 +666,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
* otherwise, conflicts will occur in multi-backends scenarios.
*/
private void initSourceReader() throws JobException {
- Backend backend = StreamingJobUtils.selectBackend();
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -712,7 +714,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
- Backend backend = StreamingJobUtils.selectBackend();
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 438fb294179..56c95596991 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -25,6 +25,8 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.SmallFileMgr;
@@ -227,19 +229,29 @@ public class StreamingJobUtils {
return JdbcClient.createJdbcClient(config);
}
- public static Backend selectBackend() throws JobException {
- Backend backend = null;
- BeSelectionPolicy policy = null;
+ public static Backend selectBackend(String cloudCluster) throws
JobException {
+ if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
+ List<Backend> bes = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getBackendsByClusterName(cloudCluster)
+ .stream()
+ .filter(Backend::isLoadAvailable)
+ .collect(Collectors.toList());
+ if (bes.isEmpty()) {
+ throw new
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG
+ + ", compute_group: " + cloudCluster);
+ }
+ int idx = getLastSelectedBackendIndexAndUpdate();
+ return bes.get(Math.floorMod(idx, bes.size()));
+ }
- policy = new
BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build();
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
+ .setEnableRoundRobin(true).needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
-
- List<Long> backendIds;
- backendIds =
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+ List<Long> backendIds =
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " +
policy);
}
- backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
throw new
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " +
policy);
}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy
new file mode 100644
index 00000000000..7fee4e14f17
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Coverage for the non-TVF (source-to-target) path: verifies compute_group is
+// rejected in non-cloud mode, persisted on CREATE JOB ... FROM MYSQL in cloud
+// mode, and that the bound job runs end-to-end, exercising
JdbcSourceOffsetProvider
+// RPCs and StreamingMultiTblTask.sendWriteRequest. Lifecycle checks (empty /
+// invalid / ALTER / PAUSE) are covered by
test_streaming_insert_job_compute_group.
+suite("test_streaming_mysql_job_compute_group",
+ "p0,external,mysql,external_docker,external_docker_mysql,nondatalake")
{
+ def jobName = "test_streaming_mysql_cg_job"
+ def currentDb = (sql "select database()")[0][0]
+ def tableName = "mysql_cg_normal1"
+ def mysqlDb = "test_cdc_cg_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableName} force"""
+
+ // Non-cloud mode: compute_group must be rejected regardless of MySQL
availability
+ if (!isCloudMode()) {
+ test {
+ sql """CREATE JOB ${jobName}
+ PROPERTIES ("compute_group" = "any_group")
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" = "jdbc:mysql://127.0.0.1:3316",
+ "driver_url" = "nop",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "nop",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${tableName}"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+ exception "only supported in cloud mode"
+ }
+ return
+ }
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ return
+ }
+
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ def clusterRows = sql "show clusters"
+ assert clusterRows.size() >= 1 : "cloud mode expects at least one cluster"
+ def cg = clusterRows.get(0).get(0)
+
+ connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableName}"""
+ sql """CREATE TABLE ${mysqlDb}.${tableName} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableName} (name, age) VALUES ('A1',
1)"""
+ sql """INSERT INTO ${mysqlDb}.${tableName} (name, age) VALUES ('B1',
2)"""
+ }
+
+ try {
+ sql """CREATE JOB ${jobName}
+ PROPERTIES ("compute_group" = "${cg}")
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${tableName}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ def props = sql """select properties from jobs("type"="insert") where
Name='${jobName}'"""
+ assert props.get(0).get(0).contains("\"compute_group\":\"${cg}\"")
+
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}'"""
+ cnt.size() == 1 &&
Integer.parseInt(cnt.get(0).get(0).toString()) >= 1
+ })
+ } catch (Exception ex) {
+ log.info("job: " + sql("""select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("task: " + sql("""select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ def rows = (sql """SELECT count(*) FROM
${currentDb}.${tableName}""").get(0).get(0) as long
+ assertTrue(rows >= 2, "expected snapshot rows in target table")
+ } finally {
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableName} force"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy
new file mode 100644
index 00000000000..66168a66f7a
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_compute_group") {
+ def tableName = "test_streaming_insert_job_cg_tbl"
+ def jobName = "test_streaming_insert_job_cg_job"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def s3Source = """
+ SELECT * FROM S3(
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ )
+ """
+
+ // ---------- Non-cloud mode: compute_group must be rejected ----------
+ if (!isCloudMode()) {
+ test {
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES ("compute_group" = "any_group")
+ ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+ """
+ exception "only supported in cloud mode"
+ }
+ return
+ }
+
+ // ---------- Cloud mode ----------
+ def clusterRows = sql "show clusters"
+ assert clusterRows.size() >= 1 : "cloud mode expects at least one cluster"
+ def cg = clusterRows.get(0).get(0)
+
+ try {
+ // 0) Empty compute_group -> CREATE rejected
+ test {
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES ("compute_group" = "")
+ ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+ """
+ exception "compute_group cannot be empty"
+ }
+
+ // 1) Invalid compute_group -> CREATE should fail
+ test {
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES (
+ "s3.max_batch_files" = "1",
+ "compute_group" = "__not_exist_cg__"
+ )
+ ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+ """
+ exception "not exist"
+ }
+
+ // 2) Valid compute_group -> CREATE succeeds; properties reflect it
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES (
+ "s3.max_batch_files" = "1",
+ "compute_group" = "${cg}"
+ )
+ ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+ """
+
+ def props = sql """select properties from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("job properties: " + props)
+ assert props.get(0).get(0).contains("\"compute_group\":\"${cg}\"")
+
+ // Wait for at least one successful task so the cluster binding is
exercised end-to-end
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}'"""
+ cnt.size() == 1 &&
Integer.parseInt(cnt.get(0).get(0).toString()) >= 1
+ })
+ } catch (Exception ex) {
+ log.info("job: " + sql("""select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("task: " + sql("""select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // 3) ALTER without PAUSE -> rejected by upstream guard (Only PAUSED
job can be altered)
+ test {
+ sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" =
"${cg}")"""
+ exception "Only PAUSED job can be altered"
+ }
+
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
+ def s = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ s.size() == 1 && 'PAUSED' == s.get(0).get(0)
+ })
+
+ // 4) ALTER to non-existent cluster -> rejected; state + bound cg
unchanged
+ test {
+ sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" =
"__not_exist_cg__")"""
+ exception "not exist"
+ }
+ def afterBadAlter = sql """select status, properties from
jobs("type"="insert") where Name='${jobName}'"""
+ assert afterBadAlter.get(0).get(0) == "PAUSED"
+ assert
afterBadAlter.get(0).get(1).contains("\"compute_group\":\"${cg}\"")
+
+ // 5) ALTER with empty compute_group -> rejected; bound cg unchanged
+ test {
+ sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = "")"""
+ exception "compute_group cannot be empty"
+ }
+ def afterEmptyAlter = sql """select properties from
jobs("type"="insert") where Name='${jobName}'"""
+ assert
afterEmptyAlter.get(0).get(0).contains("\"compute_group\":\"${cg}\"")
+
+ // 6) ALTER to the same valid cluster -> succeeds, properties updated
+ sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = "${cg}")"""
+ def afterAlter = sql """select status, properties from
jobs("type"="insert") where Name='${jobName}'"""
+ assert afterAlter.get(0).get(0) == "PAUSED"
+ assert afterAlter.get(0).get(1).contains("\"compute_group\":\"${cg}\"")
+ } finally {
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists `${tableName}` force"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy
new file mode 100644
index 00000000000..70d99c4483f
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Docker-mode coverage for compute_group routing on StreamingInsertJob:
+// spins up cloud cluster with two compute groups and asserts that the bound
+// compute_group actually steers BE traffic. Complements the non-docker suite
+// test_streaming_insert_job_compute_group which covers property lifecycle
only.
+suite("test_streaming_insert_job_compute_group_docker", "docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.setFeNum(1)
+ options.setBeNum(1)
+
+ def tableName = "test_sij_cg_docker_tbl"
+ def jobName = "test_sij_cg_docker_job"
+ def cgA = "compute_cluster"
+ def cgB = "sij_cg_b_docker"
+
+ docker(options) {
+ // default BE (index 0) lives in ${cgA}; the BE added below joins
${cgB}.
+ cluster.addBackend(1, cgB)
+ Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({
+ def cgs = sql """SHOW COMPUTE GROUPS"""
+ cgs.size() == 2
+ })
+ log.info("compute groups: ${sql """SHOW COMPUTE GROUPS"""}")
+
+ def backends = cluster.getAllBackends().sort { it.backendId }
+ assertEquals(2, backends.size())
+ def beA = backends.get(0)
+ def beB = backends.get(1)
+ log.info("beA=${beA.host}:${beA.httpPort} (${cgA})
beB=${beB.host}:${beB.httpPort} (${cgB})")
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3;
+ """
+
+ def s3Source = """
+ SELECT * FROM S3(
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ )
+ """
+
+ try {
+ def aBefore = get_be_metric(beA.host, beA.httpPort, "load_rows")
as long
+ def bBefore = get_be_metric(beB.host, beB.httpPort, "load_rows")
as long
+ log.info("phase0 a=${aBefore} b=${bBefore}")
+
+ // Phase 1: bind to cgA, verify traffic stays on cgA
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES (
+ "s3.max_batch_files" = "1",
+ "compute_group" = "${cgA}"
+ )
+ ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+ """
+
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}'"""
+ cnt.size() == 1 &&
Integer.parseInt(cnt.get(0).get(0).toString()) >= 2
+ })
+
+ def aAfter1 = get_be_metric(beA.host, beA.httpPort, "load_rows")
as long
+ def bAfter1 = get_be_metric(beB.host, beB.httpPort, "load_rows")
as long
+ log.info("phase1 a=${aAfter1} b=${bAfter1}")
+ assertTrue(aAfter1 > aBefore, "phase1 expects cgA load_rows to
increase")
+ assertTrue(bAfter1 == bBefore, "phase1 expects cgB untouched")
+ def rows1 = (sql """SELECT count(*) FROM
${tableName}""").get(0).get(0) as long
+ assertTrue(rows1 > 0, "phase1 expects target table to receive
rows")
+
+ // Phase 2: ALTER compute_group to cgB with reset offset, verify
traffic switches
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1,
SECONDS).until({
+ def s = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ s.size() == 1 && 'PAUSED' == s.get(0).get(0)
+ })
+
+ sql """
+ ALTER JOB ${jobName} PROPERTIES (
+ "compute_group" = "${cgB}",
+ "offset" =
'{"fileName":"regression/load/data/anoexist1234.csv"}'
+ )
+ """
+ sql """RESUME JOB where jobname = '${jobName}'"""
+
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}'"""
+ cnt.size() == 1 &&
Integer.parseInt(cnt.get(0).get(0).toString()) >= 4
+ })
+
+ def aAfter2 = get_be_metric(beA.host, beA.httpPort, "load_rows")
as long
+ def bAfter2 = get_be_metric(beB.host, beB.httpPort, "load_rows")
as long
+ log.info("phase2 a=${aAfter2} b=${bAfter2}")
+ assertTrue(aAfter2 == aAfter1, "phase2 expects cgA unchanged")
+ assertTrue(bAfter2 > bAfter1, "phase2 expects cgB load_rows to
increase")
+
+ // Phase 3: compute_group=cgA plus session.cloud_cluster=cgB;
compute_group must win
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1,
SECONDS).until({
+ def s = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ s.size() == 1 && 'PAUSED' == s.get(0).get(0)
+ })
+
+ sql """
+ ALTER JOB ${jobName} PROPERTIES (
+ "compute_group" = "${cgA}",
+ "session.cloud_cluster" = "${cgB}",
+ "offset" =
'{"fileName":"regression/load/data/anoexist56789.csv"}'
+ )
+ """
+ sql """RESUME JOB where jobname = '${jobName}'"""
+
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}'"""
+ cnt.size() == 1 &&
Integer.parseInt(cnt.get(0).get(0).toString()) >= 6
+ })
+
+ def aAfter3 = get_be_metric(beA.host, beA.httpPort, "load_rows")
as long
+ def bAfter3 = get_be_metric(beB.host, beB.httpPort, "load_rows")
as long
+ log.info("phase3 a=${aAfter3} b=${bAfter3}")
+ assertTrue(aAfter3 > aAfter2, "phase3 expects cgA to increase
(compute_group wins)")
+ assertTrue(bAfter3 == bAfter2, "phase3 expects cgB untouched")
+ } finally {
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists `${tableName}` force"""
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]