This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 55718e76df7 [feat](job) support show routine load job compute group 
(#59540)
55718e76df7 is described below

commit 55718e76df7af5d186b92f14e6f68e63e32a4969
Author: hui lai <[email protected]>
AuthorDate: Wed Jan 7 20:38:03 2026 +0800

    [feat](job) support show routine load job compute group (#59540)
    
    support show routine load job compute group(both `show routine load`
    command and routine load system table):
    
    ```
    ComputeGroup: cluster_name0
    ```
---
 .../schema_routine_load_job_scanner.cpp             |  4 ++++
 .../java/org/apache/doris/catalog/SchemaTable.java  |  1 +
 .../doris/load/routineload/RoutineLoadJob.java      |  5 +++++
 .../plans/commands/ShowRoutineLoadCommand.java      |  1 +
 .../apache/doris/service/FrontendServiceImpl.java   |  1 +
 gensrc/thrift/FrontendService.thrift                |  1 +
 .../test_routine_load_job_info_system_table.groovy  |  5 +++++
 .../routine_load/test_show_routine_load.groovy      | 21 +++++++++++++++++++++
 8 files changed, 39 insertions(+)

diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp 
b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
index 11f5fb376bc..c965338d46a 100644
--- a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
@@ -56,6 +56,7 @@ std::vector<SchemaScanner::ColumnDesc> 
SchemaRoutineLoadJobScanner::_s_tbls_colu
         {"USER_NAME", TYPE_STRING, sizeof(StringRef), true},
         {"CURRENT_ABORT_TASK_NUM", TYPE_INT, sizeof(int32_t), true},
         {"IS_ABNORMAL_PAUSE", TYPE_BOOLEAN, sizeof(int8_t), true},
+        {"COMPUTE_GROUP", TYPE_STRING, sizeof(StringRef), true},
 };
 
 SchemaRoutineLoadJobScanner::SchemaRoutineLoadJobScanner()
@@ -173,6 +174,9 @@ Status 
SchemaRoutineLoadJobScanner::_fill_block_impl(vectorized::Block* block) {
                 case 17: // USER_NAME
                     column_value = job_info.__isset.user_name ? 
job_info.user_name : "";
                     break;
+                case 20: // COMPUTE_GROUP
+                    column_value = job_info.__isset.compute_group ? 
job_info.compute_group : "";
+                    break;
                 }
 
                 str_refs[row_idx] =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 92356740c50..9fd564ab40b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -684,6 +684,7 @@ public class SchemaTable extends Table {
                                     .column("USER_NAME", 
ScalarType.createStringType())
                                     .column("CURRENT_ABORT_TASK_NUM", 
ScalarType.createType(PrimitiveType.INT))
                                     .column("IS_ABNORMAL_PAUSE", 
ScalarType.createType(PrimitiveType.BOOLEAN))
+                                    .column("COMPUTE_GROUP", 
ScalarType.createStringType())
                                     .build())
             )
             .put("load_jobs",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 41b476ddc6f..825cc2176dd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1638,6 +1638,10 @@ public abstract class RoutineLoadJob
         this.cloudCluster = cloudCluster;
     }
 
+    public String getClusterInfo() {
+        return Strings.nullToEmpty(cloudCluster);
+    }
+
     // check the correctness of commit info
     protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment,
                                                TransactionState txnState,
@@ -1691,6 +1695,7 @@ public abstract class RoutineLoadJob
             row.add(otherMsg);
             row.add(userIdentity.getQualifiedUser());
             row.add(comment);
+            row.add(getClusterInfo());
             return row;
         } finally {
             readUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java
index 7ffaa02b195..fe5c1441ebc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java
@@ -109,6 +109,7 @@ public class ShowRoutineLoadCommand extends ShowCommand {
             .add("OtherMsg")
             .add("User")
             .add("Comment")
+            .add("ComputeGroup")
             .build();
 
     private final LabelNameInfo labelNameInfo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 1720a27001a..90339962555 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -4364,6 +4364,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             jobInfo.setUserName(job.getUserIdentity().getQualifiedUser());
             
jobInfo.setCurrentAbortTaskNum(job.getJobStatistic().currentAbortedTaskNum);
             jobInfo.setIsAbnormalPause(job.isAbnormalPause());
+            jobInfo.setComputeGroup(job.getClusterInfo());
             jobInfos.add(jobInfo);
         }
         if (LOG.isDebugEnabled()) {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 863a38610f9..b00fc5ccbb4 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1550,6 +1550,7 @@ struct TRoutineLoadJob {
     18: optional string user_name
     19: optional i32 current_abort_task_num
     20: optional bool is_abnormal_pause
+    21: optional string compute_group
 }
 
 struct TFetchRoutineLoadJobResult {
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
index be95cc3240e..edef339ce0d 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
@@ -134,6 +134,11 @@ suite("test_routine_load_job_info_system_table","p0") {
             def res = sql "SELECT JOB_NAME FROM 
information_schema.routine_load_jobs WHERE CURRENT_ABORT_TASK_NUM > 0 OR 
IS_ABNORMAL_PAUSE = TRUE"
             log.info("res: ${res}".toString())
             assertTrue(res.toString().contains("${jobName}"))
+
+            def computeGroupRes = sql "SELECT JOB_NAME, COMPUTE_GROUP FROM 
information_schema.routine_load_jobs WHERE JOB_NAME = '${jobName}'"
+            log.info("compute group res: ${computeGroupRes}".toString())
+            assertTrue(computeGroupRes.size() > 0)
+            assertNotNull(computeGroupRes[0][1])
         } finally {
             sql "stop routine load for ${jobName}"
             sql "DROP TABLE IF EXISTS ${tableName}"
diff --git 
a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
index ac3d14ffe37..9b01e226df7 100644
--- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
@@ -191,5 +191,26 @@ suite("test_show_routine_load","p0") {
         } finally {
             sql "stop routine load for testShow"
         }
+
+        // test show routine load computegroup
+        try {
+            sql """
+                CREATE ROUTINE LOAD testShowComputeGroup ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            def res = sql "show routine load for testShowComputeGroup"
+            // ComputeGroup is the last column (index 22)
+            def computeGroupStr = res[0][22]
+            log.info("routine load computegroup: 
${computeGroupStr.toString()}".toString())
+            assertNotNull(computeGroupStr)
+        } finally {
+            sql "stop routine load for testShowComputeGroup"
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to