This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new bf22d3d3e7b branch-4.0: [feat](load) introduce load job system table 
(#57421) (#58850)
bf22d3d3e7b is described below

commit bf22d3d3e7beb435dc7378c401d505c2f8b44d73
Author: hui lai <[email protected]>
AuthorDate: Thu Dec 11 18:40:52 2025 +0800

    branch-4.0: [feat](load) introduce load job system table (#57421) (#58850)
    
    pick #57421
    
    Introduce load job statistic system table:
    ```
    mysql> show create table information_schema.load_jobs\G
    *************************** 1. row ***************************
           Table: load_jobs
    Create Table: CREATE TABLE `load_jobs` (
      `JOB_ID` text NULL,
      `LABEL` text NULL,
      `STATE` text NULL,
      `PROGRESS` text NULL,
      `TYPE` text NULL,
      `ETL_INFO` text NULL,
      `TASK_INFO` text NULL,
      `ERROR_MSG` text NULL,
      `CREATE_TIME` text NULL,
      `ETL_START_TIME` text NULL,
      `ETL_FINISH_TIME` text NULL,
      `LOAD_START_TIME` text NULL,
      `LOAD_FINISH_TIME` text NULL,
      `URL` text NULL,
      `JOB_DETAILS` text NULL,
      `TRANSACTION_ID` text NULL,
      `ERROR_TABLETS` text NULL,
      `USER` text NULL,
      `COMMENT` text NULL,
      `FIRST_ERROR_MSG` text NULL
    ) ENGINE=SCHEMA;
    1 row in set (0.01 sec)
    ```
    
    User can use the `select * from information_schema.load_jobs` instead of
    the `show load`. The advantage is that SQL can be very flexible in
    locating jobs.
    
    Example:
    ```
    mysql> SELECT * FROM          information_schema.load_jobs     WHERE        
  LABEL = 'test_load_job_label_b5347e94f2614e2c92705d6a6824a380'\G
    *************************** 1. row ***************************
              JOB_ID: 1761643165987
               LABEL: test_load_job_label_b5347e94f2614e2c92705d6a6824a380
               STATE: FINISHED
            PROGRESS: Unknown id: 1761643165987
                TYPE: INSERT
            ETL_INFO: \N
           TASK_INFO: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
           ERROR_MSG: \N
         CREATE_TIME: 2025-10-28 17:23:08
      ETL_START_TIME: 2025-10-28 17:23:08
     ETL_FINISH_TIME: 2025-10-28 17:23:08
     LOAD_START_TIME: 2025-10-28 17:23:08
    LOAD_FINISH_TIME: 2025-10-28 17:23:09
                 URL:
         JOB_DETAILS: 
{"ScannedRows":1,"LoadBytes":25,"FileNumber":0,"FileSize":0,"TaskNumber":1,"Unfinished
 backends":[],"All backends":[1754377661179]}
      TRANSACTION_ID: 72076
       ERROR_TABLETS: {}
                USER: root
             COMMENT:
     FIRST_ERROR_MSG:
    ```
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/exec/schema_scanner.cpp                     |   3 +
 be/src/exec/schema_scanner/schema_helper.cpp       |   9 +
 be/src/exec/schema_scanner/schema_helper.h         |   5 +
 .../schema_scanner/schema_load_job_scanner.cpp     | 189 +++++++++++++++++++++
 .../exec/schema_scanner/schema_load_job_scanner.h  |  50 ++++++
 .../org/apache/doris/analysis/SchemaTableType.java |   2 +
 .../java/org/apache/doris/catalog/SchemaTable.java |  24 +++
 .../apache/doris/service/FrontendServiceImpl.java  |  81 +++++++++
 gensrc/thrift/Descriptors.thrift                   |   1 +
 gensrc/thrift/FrontendService.thrift               |  32 ++++
 .../test_load_job_info_system_table.groovy         |  74 ++++++++
 11 files changed, 470 insertions(+)

diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index b606e0eb742..5e906260435 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -40,6 +40,7 @@
 #include "exec/schema_scanner/schema_encryption_keys_scanner.h"
 #include "exec/schema_scanner/schema_file_cache_statistics.h"
 #include "exec/schema_scanner/schema_files_scanner.h"
+#include "exec/schema_scanner/schema_load_job_scanner.h"
 #include "exec/schema_scanner/schema_metadata_name_ids_scanner.h"
 #include "exec/schema_scanner/schema_partitions_scanner.h"
 #include "exec/schema_scanner/schema_processlist_scanner.h"
@@ -240,6 +241,8 @@ std::unique_ptr<SchemaScanner> 
SchemaScanner::create(TSchemaTableType::type type
         return SchemaBackendKerberosTicketCacheScanner::create_unique();
     case TSchemaTableType::SCH_ROUTINE_LOAD_JOBS:
         return SchemaRoutineLoadJobScanner::create_unique();
+    case TSchemaTableType::SCH_LOAD_JOBS:
+        return SchemaLoadJobScanner::create_unique();
     case TSchemaTableType::SCH_BACKEND_TABLETS:
         return SchemaTabletsScanner::create_unique();
     case TSchemaTableType::SCH_VIEW_DEPENDENCY:
diff --git a/be/src/exec/schema_scanner/schema_helper.cpp 
b/be/src/exec/schema_scanner/schema_helper.cpp
index 37c2af63142..33516e01726 100644
--- a/be/src/exec/schema_scanner/schema_helper.cpp
+++ b/be/src/exec/schema_scanner/schema_helper.cpp
@@ -151,6 +151,15 @@ Status SchemaHelper::fetch_routine_load_job(const 
std::string& ip, const int32_t
             });
 }
 
+Status SchemaHelper::fetch_load_job(const std::string& ip, const int32_t port,
+                                    const TFetchLoadJobRequest& request,
+                                    TFetchLoadJobResult* result) {
+    return ThriftRpcHelper::rpc<FrontendServiceClient>(
+            ip, port, [&request, &result](FrontendServiceConnection& client) {
+                client->fetchLoadJob(*result, request);
+            });
+}
+
 Status SchemaHelper::fetch_schema_table_data(const std::string& ip, const 
int32_t port,
                                              const 
TFetchSchemaTableDataRequest& request,
                                              TFetchSchemaTableDataResult* 
result) {
diff --git a/be/src/exec/schema_scanner/schema_helper.h 
b/be/src/exec/schema_scanner/schema_helper.h
index cc931f30537..76bc7eaea46 100644
--- a/be/src/exec/schema_scanner/schema_helper.h
+++ b/be/src/exec/schema_scanner/schema_helper.h
@@ -29,6 +29,8 @@ class TDescribeTablesParams;
 class TDescribeTablesResult;
 class TFetchRoutineLoadJobRequest;
 class TFetchRoutineLoadJobResult;
+class TFetchLoadJobRequest;
+class TFetchLoadJobResult;
 class TFetchSchemaTableDataRequest;
 class TFetchSchemaTableDataResult;
 class TGetDbsParams;
@@ -94,6 +96,9 @@ public:
                                          const TFetchRoutineLoadJobRequest& 
request,
                                          TFetchRoutineLoadJobResult* result);
 
+    static Status fetch_load_job(const std::string& ip, const int32_t port,
+                                 const TFetchLoadJobRequest& request, 
TFetchLoadJobResult* result);
+
     static Status fetch_schema_table_data(const std::string& ip, const int32_t 
port,
                                           const TFetchSchemaTableDataRequest& 
request,
                                           TFetchSchemaTableDataResult* result);
diff --git a/be/src/exec/schema_scanner/schema_load_job_scanner.cpp 
b/be/src/exec/schema_scanner/schema_load_job_scanner.cpp
new file mode 100644
index 00000000000..851882d13f9
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_load_job_scanner.cpp
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_load_job_scanner.h"
+
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/FrontendService_types.h>
+
+#include <string>
+
+#include "exec/schema_scanner/schema_helper.h"
+#include "runtime/runtime_state.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+std::vector<SchemaScanner::ColumnDesc> SchemaLoadJobScanner::_s_tbls_columns = 
{
+        //   name,       type,          size,     is_null
+        {"JOB_ID", TYPE_STRING, sizeof(StringRef), true},
+        {"LABEL", TYPE_STRING, sizeof(StringRef), true},
+        {"STATE", TYPE_STRING, sizeof(StringRef), true},
+        {"PROGRESS", TYPE_STRING, sizeof(StringRef), true},
+        {"TYPE", TYPE_STRING, sizeof(StringRef), true},
+        {"ETL_INFO", TYPE_STRING, sizeof(StringRef), true},
+        {"TASK_INFO", TYPE_STRING, sizeof(StringRef), true},
+        {"ERROR_MSG", TYPE_STRING, sizeof(StringRef), true},
+        {"CREATE_TIME", TYPE_STRING, sizeof(StringRef), true},
+        {"ETL_START_TIME", TYPE_STRING, sizeof(StringRef), true},
+        {"ETL_FINISH_TIME", TYPE_STRING, sizeof(StringRef), true},
+        {"LOAD_START_TIME", TYPE_STRING, sizeof(StringRef), true},
+        {"LOAD_FINISH_TIME", TYPE_STRING, sizeof(StringRef), true},
+        {"URL", TYPE_STRING, sizeof(StringRef), true},
+        {"JOB_DETAILS", TYPE_STRING, sizeof(StringRef), true},
+        {"TRANSACTION_ID", TYPE_STRING, sizeof(StringRef), true},
+        {"ERROR_TABLETS", TYPE_STRING, sizeof(StringRef), true},
+        {"USER", TYPE_STRING, sizeof(StringRef), true},
+        {"COMMENT", TYPE_STRING, sizeof(StringRef), true},
+        {"FIRST_ERROR_MSG", TYPE_STRING, sizeof(StringRef), true},
+};
+
+SchemaLoadJobScanner::SchemaLoadJobScanner()
+        : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_LOAD_JOBS) {}
+
+SchemaLoadJobScanner::~SchemaLoadJobScanner() {}
+
+Status SchemaLoadJobScanner::start(RuntimeState* state) {
+    if (!_is_init) {
+        return Status::InternalError("used before initialized.");
+    }
+    TFetchLoadJobRequest request;
+    RETURN_IF_ERROR(SchemaHelper::fetch_load_job(*(_param->common_param->ip),
+                                                 _param->common_param->port, 
request, &_result));
+    return Status::OK();
+}
+
+Status SchemaLoadJobScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
+    if (!_is_init) {
+        return Status::InternalError("call this before initial.");
+    }
+    if (block == nullptr || eos == nullptr) {
+        return Status::InternalError("invalid parameter.");
+    }
+
+    *eos = true;
+    if (_result.loadJobs.empty()) {
+        return Status::OK();
+    }
+
+    return _fill_block_impl(block);
+}
+
+Status SchemaLoadJobScanner::_fill_block_impl(vectorized::Block* block) {
+    SCOPED_TIMER(_fill_block_timer);
+
+    const auto& jobs_info = _result.loadJobs;
+    size_t row_num = jobs_info.size();
+    if (row_num == 0) {
+        return Status::OK();
+    }
+
+    for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) {
+        const auto& col_desc = _s_tbls_columns[col_idx];
+
+        std::vector<StringRef> str_refs(row_num);
+        std::vector<void*> datas(row_num);
+        std::vector<std::string> column_values(row_num);
+
+        for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
+            const auto& job_info = jobs_info[row_idx];
+            std::string& column_value = column_values[row_idx];
+
+            if (col_desc.type == TYPE_STRING) {
+                switch (col_idx) {
+                case 0: // JOB_ID
+                    column_value = job_info.__isset.job_id ? job_info.job_id : 
"";
+                    break;
+                case 1: // LABEL
+                    column_value = job_info.__isset.label ? job_info.label : 
"";
+                    break;
+                case 2: // STATE
+                    column_value = job_info.__isset.state ? job_info.state : 
"";
+                    break;
+                case 3: // PROGRESS
+                    column_value = job_info.__isset.progress ? 
job_info.progress : "";
+                    break;
+                case 4: // TYPE
+                    column_value = job_info.__isset.type ? job_info.type : "";
+                    break;
+                case 5: // ETL_INFO
+                    column_value = job_info.__isset.etl_info ? 
job_info.etl_info : "";
+                    break;
+                case 6: // TASK_INFO
+                    column_value = job_info.__isset.task_info ? 
job_info.task_info : "";
+                    break;
+                case 7: // ERROR_MSG
+                    column_value = job_info.__isset.error_msg ? 
job_info.error_msg : "";
+                    break;
+                case 8: // CREATE_TIME
+                    column_value = job_info.__isset.create_time ? 
job_info.create_time : "";
+                    break;
+                case 9: // ETL_START_TIME
+                    column_value = job_info.__isset.etl_start_time ? 
job_info.etl_start_time : "";
+                    break;
+                case 10: // ETL_FINISH_TIME
+                    column_value = job_info.__isset.etl_finish_time ? 
job_info.etl_finish_time : "";
+                    break;
+                case 11: // LOAD_START_TIME
+                    column_value = job_info.__isset.load_start_time ? 
job_info.load_start_time : "";
+                    break;
+                case 12: // LOAD_FINISH_TIME
+                    column_value =
+                            job_info.__isset.load_finish_time ? 
job_info.load_finish_time : "";
+                    break;
+                case 13: // URL
+                    column_value = job_info.__isset.url ? job_info.url : "";
+                    break;
+                case 14: // JOB_DETAILS
+                    column_value = job_info.__isset.job_details ? 
job_info.job_details : "";
+                    break;
+                case 15: // TRANSACTION_ID
+                    column_value = job_info.__isset.transaction_id ? 
job_info.transaction_id : "";
+                    break;
+                case 16: // ERROR_TABLETS
+                    column_value = job_info.__isset.error_tablets ? 
job_info.error_tablets : "";
+                    break;
+                case 17: // USER
+                    column_value = job_info.__isset.user ? job_info.user : "";
+                    break;
+                case 18: // COMMENT
+                    column_value = job_info.__isset.comment ? job_info.comment 
: "";
+                    break;
+                case 19: // FIRST_ERROR_MSG
+                    column_value = job_info.__isset.first_error_msg ? 
job_info.first_error_msg : "";
+                    break;
+                }
+
+                str_refs[row_idx] =
+                        StringRef(column_values[row_idx].data(), 
column_values[row_idx].size());
+                datas[row_idx] = &str_refs[row_idx];
+            }
+        }
+
+        RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas));
+    }
+
+    return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_load_job_scanner.h 
b/be/src/exec/schema_scanner/schema_load_job_scanner.h
new file mode 100644
index 00000000000..8fcd5fa56d8
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_load_job_scanner.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/FrontendService_types.h>
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaLoadJobScanner : public SchemaScanner {
+    ENABLE_FACTORY_CREATOR(SchemaLoadJobScanner);
+
+public:
+    SchemaLoadJobScanner();
+    ~SchemaLoadJobScanner() override;
+
+    Status start(RuntimeState* state) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
+
+private:
+    Status _fill_block_impl(vectorized::Block* block);
+
+    TFetchLoadJobResult _result;
+    static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+};
+
+} // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index aafcae4a913..6ef39998594 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -96,6 +96,8 @@ public enum SchemaTableType {
             TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE),
     SCH_ROUTINE_LOAD_JOBS("ROUTINE_LOAD_JOBS", "ROUTINE_LOAD_JOBS",
             TSchemaTableType.SCH_ROUTINE_LOAD_JOBS),
+    SCH_LOAD_JOBS("LOAD_JOBS", "LOAD_JOBS",
+            TSchemaTableType.SCH_LOAD_JOBS),
     SCH_VIEW_DEPENDENCY("VIEW_DEPENDENCY", "VIEW_DEPENDENCY",
                     TSchemaTableType.SCH_VIEW_DEPENDENCY),
     SQL_BLOCK_RULE_STATUS("SQL_BLOCK_RULE_STATUS", "SQL_BLOCK_RULE_STATUS",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index deb8aac86a0..dc53598c2c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -658,6 +658,30 @@ public class SchemaTable extends Table {
                                     .column("IS_ABNORMAL_PAUSE", 
ScalarType.createType(PrimitiveType.BOOLEAN))
                                     .build())
             )
+            .put("load_jobs",
+                    new SchemaTable(SystemIdGenerator.getNextId(), 
"load_jobs", TableType.SCHEMA,
+                            builder().column("JOB_ID", 
ScalarType.createStringType())
+                                    .column("LABEL", 
ScalarType.createStringType())
+                                    .column("STATE", 
ScalarType.createStringType())
+                                    .column("PROGRESS", 
ScalarType.createStringType())
+                                    .column("TYPE", 
ScalarType.createStringType())
+                                    .column("ETL_INFO", 
ScalarType.createStringType())
+                                    .column("TASK_INFO", 
ScalarType.createStringType())
+                                    .column("ERROR_MSG", 
ScalarType.createStringType())
+                                    .column("CREATE_TIME", 
ScalarType.createStringType())
+                                    .column("ETL_START_TIME", 
ScalarType.createStringType())
+                                    .column("ETL_FINISH_TIME", 
ScalarType.createStringType())
+                                    .column("LOAD_START_TIME", 
ScalarType.createStringType())
+                                    .column("LOAD_FINISH_TIME", 
ScalarType.createStringType())
+                                    .column("URL", 
ScalarType.createStringType())
+                                    .column("JOB_DETAILS", 
ScalarType.createStringType())
+                                    .column("TRANSACTION_ID", 
ScalarType.createStringType())
+                                    .column("ERROR_TABLETS", 
ScalarType.createStringType())
+                                    .column("USER", 
ScalarType.createStringType())
+                                    .column("COMMENT", 
ScalarType.createStringType())
+                                    .column("FIRST_ERROR_MSG", 
ScalarType.createStringType())
+                                    .build())
+            )
             .put("backend_tablets", new 
SchemaTable(SystemIdGenerator.getNextId(), "backend_tablets", TableType.SCHEMA,
                     builder().column("BE_ID", 
ScalarType.createType(PrimitiveType.BIGINT))
                             .column("TABLET_ID", 
ScalarType.createType(PrimitiveType.BIGINT))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7061942b384..481c5a607d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -91,6 +91,7 @@ import org.apache.doris.encryption.EncryptionKey;
 import org.apache.doris.insertoverwrite.InsertOverwriteManager;
 import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
 import org.apache.doris.load.StreamLoadHandler;
+import org.apache.doris.load.loadv2.LoadManager;
 import org.apache.doris.load.routineload.ErrorReason;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
@@ -164,6 +165,8 @@ import org.apache.doris.thrift.TDropPlsqlPackageRequest;
 import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest;
 import org.apache.doris.thrift.TEncryptionAlgorithm;
 import org.apache.doris.thrift.TEncryptionKey;
+import org.apache.doris.thrift.TFetchLoadJobRequest;
+import org.apache.doris.thrift.TFetchLoadJobResult;
 import org.apache.doris.thrift.TFetchResourceResult;
 import org.apache.doris.thrift.TFetchRoutineLoadJobRequest;
 import org.apache.doris.thrift.TFetchRoutineLoadJobResult;
@@ -214,6 +217,7 @@ import 
org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
 import org.apache.doris.thrift.TListPrivilegesResult;
 import org.apache.doris.thrift.TListTableMetadataNameIdsResult;
 import org.apache.doris.thrift.TListTableStatusResult;
+import org.apache.doris.thrift.TLoadJob;
 import org.apache.doris.thrift.TLoadTxn2PCRequest;
 import org.apache.doris.thrift.TLoadTxn2PCResult;
 import org.apache.doris.thrift.TLoadTxnBeginRequest;
@@ -4528,6 +4532,83 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    @Override
+    public TFetchLoadJobResult fetchLoadJob(TFetchLoadJobRequest request) {
+        TFetchLoadJobResult result = new TFetchLoadJobResult();
+
+        if (!Env.getCurrentEnv().isReady()) {
+            return result;
+        }
+
+        // Create a ConnectContext with skipAuth=true for system table access
+        // This is necessary because LoadJob.checkAuth() requires a 
ConnectContext
+        // and system table queries from backend don't have user context
+        ConnectContext ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setSkipAuth(true);
+        ctx.setThreadLocalInfo();
+
+        try {
+            LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
+            List<TLoadJob> jobInfos = Lists.newArrayList();
+            List<String> dbNames = 
Env.getCurrentInternalCatalog().getDbNames();
+            for (String dbName : dbNames) {
+                DatabaseIf db;
+                try {
+                    db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+                } catch (Exception e) {
+                    LOG.warn("Failed to get database: {}", dbName, e);
+                    continue;
+                }
+                long dbId = db.getId();
+                try {
+                    List<List<Comparable>> loadJobInfosByDb = 
loadManager.getLoadJobInfosByDb(
+                            dbId, null, false, null);
+                    for (List<Comparable> jobInfo : loadJobInfosByDb) {
+                        TLoadJob tJob = new TLoadJob();
+                        // Based on LOAD_TITLE_NAMES order:
+                        // JobId, Label, State, Progress, Type, EtlInfo, 
TaskInfo, ErrorMsg, CreateTime,
+                        // EtlStartTime, EtlFinishTime, LoadStartTime, 
LoadFinishTime, URL, JobDetails,
+                        // TransactionId, ErrorTablets, User, Comment, 
FirstErrorMsg
+                        if (jobInfo.size() >= 20) {
+                            tJob.setJobId(String.valueOf(jobInfo.get(0)));
+                            tJob.setLabel(String.valueOf(jobInfo.get(1)));
+                            tJob.setState(String.valueOf(jobInfo.get(2)));
+                            tJob.setProgress(String.valueOf(jobInfo.get(3)));
+                            tJob.setType(String.valueOf(jobInfo.get(4)));
+                            tJob.setEtlInfo(String.valueOf(jobInfo.get(5)));
+                            tJob.setTaskInfo(String.valueOf(jobInfo.get(6)));
+                            tJob.setErrorMsg(String.valueOf(jobInfo.get(7)));
+                            tJob.setCreateTime(String.valueOf(jobInfo.get(8)));
+                            
tJob.setEtlStartTime(String.valueOf(jobInfo.get(9)));
+                            
tJob.setEtlFinishTime(String.valueOf(jobInfo.get(10)));
+                            
tJob.setLoadStartTime(String.valueOf(jobInfo.get(11)));
+                            
tJob.setLoadFinishTime(String.valueOf(jobInfo.get(12)));
+                            tJob.setUrl(String.valueOf(jobInfo.get(13)));
+                            
tJob.setJobDetails(String.valueOf(jobInfo.get(14)));
+                            
tJob.setTransactionId(String.valueOf(jobInfo.get(15)));
+                            
tJob.setErrorTablets(String.valueOf(jobInfo.get(16)));
+                            tJob.setUser(String.valueOf(jobInfo.get(17)));
+                            tJob.setComment(String.valueOf(jobInfo.get(18)));
+                            
tJob.setFirstErrorMsg(String.valueOf(jobInfo.get(19)));
+                            jobInfos.add(tJob);
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.warn("Failed to get load jobs for database: {}", 
dbName, e);
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("load job infos: {}", jobInfos);
+            }
+            result.setLoadJobs(jobInfos);
+
+            return result;
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
     @Override
     public TGetTableTDEInfoResult getTableTDEInfo(TGetTableTDEInfoRequest 
request) throws TException {
         String clientAddr = getClientAddrAsString();
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index df769272bb1..c3f38f14cc5 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -160,6 +160,7 @@ enum TSchemaTableType {
     SCH_CLUSTER_SNAPSHOTS = 60;
     SCH_CLUSTER_SNAPSHOT_PROPERTIES = 61;
     SCH_BLACKHOLE = 62;
+    SCH_LOAD_JOBS = 64;
 }
 
 enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 9afc7570225..0c3bdea1a09 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1606,6 +1606,36 @@ struct TFetchRoutineLoadJobResult {
     1: optional list<TRoutineLoadJob> routineLoadJobs
 }
 
+struct TFetchLoadJobRequest {
+}
+
+struct TLoadJob {
+    1: optional string job_id
+    2: optional string label
+    3: optional string state
+    4: optional string progress
+    5: optional string type
+    6: optional string etl_info
+    7: optional string task_info
+    8: optional string error_msg
+    9: optional string create_time
+    10: optional string etl_start_time
+    11: optional string etl_finish_time
+    12: optional string load_start_time
+    13: optional string load_finish_time
+    14: optional string url
+    15: optional string job_details
+    16: optional string transaction_id
+    17: optional string error_tablets
+    18: optional string user
+    19: optional string comment
+    20: optional string first_error_msg
+}
+
+struct TFetchLoadJobResult {
+    1: optional list<TLoadJob> loadJobs
+}
+
 struct TPlanNodeRuntimeStatsItem {
     // node_id means PlanNodeId, add this field so that we can merge 
RuntimeProfile of same node more easily
     1: optional i32 node_id
@@ -1774,6 +1804,8 @@ service FrontendService {
 
     TFetchRoutineLoadJobResult fetchRoutineLoadJob(1: 
TFetchRoutineLoadJobRequest request)
 
+    TFetchLoadJobResult fetchLoadJob(1: TFetchLoadJobRequest request)
+
     TGetEncryptionKeysResult getEncryptionKeys(1: TGetEncryptionKeysRequest 
request)
 
     TGetTableTDEInfoResult getTableTDEInfo(1: TGetTableTDEInfoRequest request)
diff --git 
a/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy
 
b/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy
new file mode 100644
index 00000000000..05cad21b3ce
--- /dev/null
+++ 
b/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_load_job_info_system_table", "p0") {
+    def tableName = "test_load_job_info_table"
+    def label = "test_load_job_label_" + 
UUID.randomUUID().toString().replace("-", "")
+    sql """
+    DROP TABLE IF EXISTS ${tableName}
+    """
+    sql """
+    CREATE TABLE ${tableName}
+    (
+        k1 INT NOT NULL,
+        k2 VARCHAR(50) NOT NULL,
+        k3 DATETIME NOT NULL,
+        v1 INT SUM DEFAULT '0'
+    )
+    AGGREGATE KEY(k1, k2, k3)
+    DISTRIBUTED BY HASH(k1) BUCKETS 3
+    PROPERTIES (
+        "replication_num" = "1"
+    )
+    """
+
+    sql """
+    INSERT INTO ${tableName} with label ${label} select 1, 'test1', 
'2023-01-01 00:00:00', 10
+    """
+
+    def res = sql """
+    SELECT 
+        JOB_ID,
+        LABEL,
+        STATE,
+        PROGRESS,
+        TYPE,
+        ETL_INFO,
+        TASK_INFO,
+        ERROR_MSG,
+        CREATE_TIME,
+        ETL_START_TIME,
+        ETL_FINISH_TIME,
+        LOAD_START_TIME,
+        LOAD_FINISH_TIME,
+        URL,
+        JOB_DETAILS,
+        TRANSACTION_ID,
+        ERROR_TABLETS,
+        USER,
+        COMMENT,
+        FIRST_ERROR_MSG
+    FROM 
+        information_schema.load_jobs
+    WHERE 
+        LABEL = '${label}'
+    """
+
+    log.info("Result size: ${res.size()}")
+    assertTrue(res.size() > 0, "Job should appear in load_jobs system table 
after ${label} insert")
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to