This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 55718e76df7 [feat](job) support show routine load job compute group
(#59540)
55718e76df7 is described below
commit 55718e76df7af5d186b92f14e6f68e63e32a4969
Author: hui lai <[email protected]>
AuthorDate: Wed Jan 7 20:38:03 2026 +0800
[feat](job) support show routine load job compute group (#59540)
support show routine load job compute group(both `show routine load`
command and routine load system table):
```
ComputeGroup: cluster_name0
```
---
.../schema_routine_load_job_scanner.cpp | 4 ++++
.../java/org/apache/doris/catalog/SchemaTable.java | 1 +
.../doris/load/routineload/RoutineLoadJob.java | 5 +++++
.../plans/commands/ShowRoutineLoadCommand.java | 1 +
.../apache/doris/service/FrontendServiceImpl.java | 1 +
gensrc/thrift/FrontendService.thrift | 1 +
.../test_routine_load_job_info_system_table.groovy | 5 +++++
.../routine_load/test_show_routine_load.groovy | 21 +++++++++++++++++++++
8 files changed, 39 insertions(+)
diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
index 11f5fb376bc..c965338d46a 100644
--- a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
@@ -56,6 +56,7 @@ std::vector<SchemaScanner::ColumnDesc>
SchemaRoutineLoadJobScanner::_s_tbls_colu
{"USER_NAME", TYPE_STRING, sizeof(StringRef), true},
{"CURRENT_ABORT_TASK_NUM", TYPE_INT, sizeof(int32_t), true},
{"IS_ABNORMAL_PAUSE", TYPE_BOOLEAN, sizeof(int8_t), true},
+ {"COMPUTE_GROUP", TYPE_STRING, sizeof(StringRef), true},
};
SchemaRoutineLoadJobScanner::SchemaRoutineLoadJobScanner()
@@ -173,6 +174,9 @@ Status
SchemaRoutineLoadJobScanner::_fill_block_impl(vectorized::Block* block) {
case 17: // USER_NAME
column_value = job_info.__isset.user_name ?
job_info.user_name : "";
break;
+ case 20: // COMPUTE_GROUP
+ column_value = job_info.__isset.compute_group ?
job_info.compute_group : "";
+ break;
}
str_refs[row_idx] =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 92356740c50..9fd564ab40b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -684,6 +684,7 @@ public class SchemaTable extends Table {
.column("USER_NAME",
ScalarType.createStringType())
.column("CURRENT_ABORT_TASK_NUM",
ScalarType.createType(PrimitiveType.INT))
.column("IS_ABNORMAL_PAUSE",
ScalarType.createType(PrimitiveType.BOOLEAN))
+ .column("COMPUTE_GROUP",
ScalarType.createStringType())
.build())
)
.put("load_jobs",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 41b476ddc6f..825cc2176dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1638,6 +1638,10 @@ public abstract class RoutineLoadJob
this.cloudCluster = cloudCluster;
}
+ public String getClusterInfo() {
+ return Strings.nullToEmpty(cloudCluster);
+ }
+
// check the correctness of commit info
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment,
TransactionState txnState,
@@ -1691,6 +1695,7 @@ public abstract class RoutineLoadJob
row.add(otherMsg);
row.add(userIdentity.getQualifiedUser());
row.add(comment);
+ row.add(getClusterInfo());
return row;
} finally {
readUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java
index 7ffaa02b195..fe5c1441ebc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java
@@ -109,6 +109,7 @@ public class ShowRoutineLoadCommand extends ShowCommand {
.add("OtherMsg")
.add("User")
.add("Comment")
+ .add("ComputeGroup")
.build();
private final LabelNameInfo labelNameInfo;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 1720a27001a..90339962555 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -4364,6 +4364,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
jobInfo.setUserName(job.getUserIdentity().getQualifiedUser());
jobInfo.setCurrentAbortTaskNum(job.getJobStatistic().currentAbortedTaskNum);
jobInfo.setIsAbnormalPause(job.isAbnormalPause());
+ jobInfo.setComputeGroup(job.getClusterInfo());
jobInfos.add(jobInfo);
}
if (LOG.isDebugEnabled()) {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 863a38610f9..b00fc5ccbb4 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1550,6 +1550,7 @@ struct TRoutineLoadJob {
18: optional string user_name
19: optional i32 current_abort_task_num
20: optional bool is_abnormal_pause
+ 21: optional string compute_group
}
struct TFetchRoutineLoadJobResult {
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
index be95cc3240e..edef339ce0d 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
@@ -134,6 +134,11 @@ suite("test_routine_load_job_info_system_table","p0") {
def res = sql "SELECT JOB_NAME FROM
information_schema.routine_load_jobs WHERE CURRENT_ABORT_TASK_NUM > 0 OR
IS_ABNORMAL_PAUSE = TRUE"
log.info("res: ${res}".toString())
assertTrue(res.toString().contains("${jobName}"))
+
+ def computeGroupRes = sql "SELECT JOB_NAME, COMPUTE_GROUP FROM
information_schema.routine_load_jobs WHERE JOB_NAME = '${jobName}'"
+ log.info("compute group res: ${computeGroupRes}".toString())
+ assertTrue(computeGroupRes.size() > 0)
+ assertNotNull(computeGroupRes[0][1])
} finally {
sql "stop routine load for ${jobName}"
sql "DROP TABLE IF EXISTS ${tableName}"
diff --git
a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
index ac3d14ffe37..9b01e226df7 100644
--- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
@@ -191,5 +191,26 @@ suite("test_show_routine_load","p0") {
} finally {
sql "stop routine load for testShow"
}
+
+ // test show routine load computegroup
+ try {
+ sql """
+ CREATE ROUTINE LOAD testShowComputeGroup ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ def res = sql "show routine load for testShowComputeGroup"
+ // ComputeGroup is the last column (index 22)
+ def computeGroupStr = res[0][22]
+ log.info("routine load computegroup:
${computeGroupStr.toString()}".toString())
+ assertNotNull(computeGroupStr)
+ } finally {
+ sql "stop routine load for testShowComputeGroup"
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]