This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new bf22d3d3e7b branch-4.0: [feat](load) introduce load job system table
(#57421) (#58850)
bf22d3d3e7b is described below
commit bf22d3d3e7beb435dc7378c401d505c2f8b44d73
Author: hui lai <[email protected]>
AuthorDate: Thu Dec 11 18:40:52 2025 +0800
branch-4.0: [feat](load) introduce load job system table (#57421) (#58850)
pick #57421
Introduce load job statistic system table:
```
mysql> show create table information_schema.load_jobs\G
*************************** 1. row ***************************
Table: load_jobs
Create Table: CREATE TABLE `load_jobs` (
`JOB_ID` text NULL,
`LABEL` text NULL,
`STATE` text NULL,
`PROGRESS` text NULL,
`TYPE` text NULL,
`ETL_INFO` text NULL,
`TASK_INFO` text NULL,
`ERROR_MSG` text NULL,
`CREATE_TIME` text NULL,
`ETL_START_TIME` text NULL,
`ETL_FINISH_TIME` text NULL,
`LOAD_START_TIME` text NULL,
`LOAD_FINISH_TIME` text NULL,
`URL` text NULL,
`JOB_DETAILS` text NULL,
`TRANSACTION_ID` text NULL,
`ERROR_TABLETS` text NULL,
`USER` text NULL,
`COMMENT` text NULL,
`FIRST_ERROR_MSG` text NULL
) ENGINE=SCHEMA;
1 row in set (0.01 sec)
```
User can use the `select * from information_schema.load_jobs` instead of
the `show load`. The advantage is that SQL can be very flexible in
locating jobs.
Example:
```
mysql> SELECT * FROM information_schema.load_jobs WHERE
LABEL = 'test_load_job_label_b5347e94f2614e2c92705d6a6824a380'\G
*************************** 1. row ***************************
JOB_ID: 1761643165987
LABEL: test_load_job_label_b5347e94f2614e2c92705d6a6824a380
STATE: FINISHED
PROGRESS: Unknown id: 1761643165987
TYPE: INSERT
ETL_INFO: \N
TASK_INFO: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ERROR_MSG: \N
CREATE_TIME: 2025-10-28 17:23:08
ETL_START_TIME: 2025-10-28 17:23:08
ETL_FINISH_TIME: 2025-10-28 17:23:08
LOAD_START_TIME: 2025-10-28 17:23:08
LOAD_FINISH_TIME: 2025-10-28 17:23:09
URL:
JOB_DETAILS:
{"ScannedRows":1,"LoadBytes":25,"FileNumber":0,"FileSize":0,"TaskNumber":1,"Unfinished
backends":[],"All backends":[1754377661179]}
TRANSACTION_ID: 72076
ERROR_TABLETS: {}
USER: root
COMMENT:
FIRST_ERROR_MSG:
```
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/exec/schema_scanner.cpp | 3 +
be/src/exec/schema_scanner/schema_helper.cpp | 9 +
be/src/exec/schema_scanner/schema_helper.h | 5 +
.../schema_scanner/schema_load_job_scanner.cpp | 189 +++++++++++++++++++++
.../exec/schema_scanner/schema_load_job_scanner.h | 50 ++++++
.../org/apache/doris/analysis/SchemaTableType.java | 2 +
.../java/org/apache/doris/catalog/SchemaTable.java | 24 +++
.../apache/doris/service/FrontendServiceImpl.java | 81 +++++++++
gensrc/thrift/Descriptors.thrift | 1 +
gensrc/thrift/FrontendService.thrift | 32 ++++
.../test_load_job_info_system_table.groovy | 74 ++++++++
11 files changed, 470 insertions(+)
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index b606e0eb742..5e906260435 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -40,6 +40,7 @@
#include "exec/schema_scanner/schema_encryption_keys_scanner.h"
#include "exec/schema_scanner/schema_file_cache_statistics.h"
#include "exec/schema_scanner/schema_files_scanner.h"
+#include "exec/schema_scanner/schema_load_job_scanner.h"
#include "exec/schema_scanner/schema_metadata_name_ids_scanner.h"
#include "exec/schema_scanner/schema_partitions_scanner.h"
#include "exec/schema_scanner/schema_processlist_scanner.h"
@@ -240,6 +241,8 @@ std::unique_ptr<SchemaScanner>
SchemaScanner::create(TSchemaTableType::type type
return SchemaBackendKerberosTicketCacheScanner::create_unique();
case TSchemaTableType::SCH_ROUTINE_LOAD_JOBS:
return SchemaRoutineLoadJobScanner::create_unique();
+ case TSchemaTableType::SCH_LOAD_JOBS:
+ return SchemaLoadJobScanner::create_unique();
case TSchemaTableType::SCH_BACKEND_TABLETS:
return SchemaTabletsScanner::create_unique();
case TSchemaTableType::SCH_VIEW_DEPENDENCY:
diff --git a/be/src/exec/schema_scanner/schema_helper.cpp
b/be/src/exec/schema_scanner/schema_helper.cpp
index 37c2af63142..33516e01726 100644
--- a/be/src/exec/schema_scanner/schema_helper.cpp
+++ b/be/src/exec/schema_scanner/schema_helper.cpp
@@ -151,6 +151,15 @@ Status SchemaHelper::fetch_routine_load_job(const
std::string& ip, const int32_t
});
}
+Status SchemaHelper::fetch_load_job(const std::string& ip, const int32_t port,
+ const TFetchLoadJobRequest& request,
+ TFetchLoadJobResult* result) {
+ return ThriftRpcHelper::rpc<FrontendServiceClient>(
+ ip, port, [&request, &result](FrontendServiceConnection& client) {
+ client->fetchLoadJob(*result, request);
+ });
+}
+
Status SchemaHelper::fetch_schema_table_data(const std::string& ip, const
int32_t port,
const
TFetchSchemaTableDataRequest& request,
TFetchSchemaTableDataResult*
result) {
diff --git a/be/src/exec/schema_scanner/schema_helper.h
b/be/src/exec/schema_scanner/schema_helper.h
index cc931f30537..76bc7eaea46 100644
--- a/be/src/exec/schema_scanner/schema_helper.h
+++ b/be/src/exec/schema_scanner/schema_helper.h
@@ -29,6 +29,8 @@ class TDescribeTablesParams;
class TDescribeTablesResult;
class TFetchRoutineLoadJobRequest;
class TFetchRoutineLoadJobResult;
+class TFetchLoadJobRequest;
+class TFetchLoadJobResult;
class TFetchSchemaTableDataRequest;
class TFetchSchemaTableDataResult;
class TGetDbsParams;
@@ -94,6 +96,9 @@ public:
const TFetchRoutineLoadJobRequest&
request,
TFetchRoutineLoadJobResult* result);
+ static Status fetch_load_job(const std::string& ip, const int32_t port,
+ const TFetchLoadJobRequest& request,
TFetchLoadJobResult* result);
+
static Status fetch_schema_table_data(const std::string& ip, const int32_t
port,
const TFetchSchemaTableDataRequest&
request,
TFetchSchemaTableDataResult* result);
diff --git a/be/src/exec/schema_scanner/schema_load_job_scanner.cpp
b/be/src/exec/schema_scanner/schema_load_job_scanner.cpp
new file mode 100644
index 00000000000..851882d13f9
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_load_job_scanner.cpp
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_load_job_scanner.h"
+
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/FrontendService_types.h>
+
+#include <string>
+
+#include "exec/schema_scanner/schema_helper.h"
+#include "runtime/runtime_state.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+std::vector<SchemaScanner::ColumnDesc> SchemaLoadJobScanner::_s_tbls_columns =
{
+ // name, type, size, is_null
+ {"JOB_ID", TYPE_STRING, sizeof(StringRef), true},
+ {"LABEL", TYPE_STRING, sizeof(StringRef), true},
+ {"STATE", TYPE_STRING, sizeof(StringRef), true},
+ {"PROGRESS", TYPE_STRING, sizeof(StringRef), true},
+ {"TYPE", TYPE_STRING, sizeof(StringRef), true},
+ {"ETL_INFO", TYPE_STRING, sizeof(StringRef), true},
+ {"TASK_INFO", TYPE_STRING, sizeof(StringRef), true},
+ {"ERROR_MSG", TYPE_STRING, sizeof(StringRef), true},
+ {"CREATE_TIME", TYPE_STRING, sizeof(StringRef), true},
+ {"ETL_START_TIME", TYPE_STRING, sizeof(StringRef), true},
+ {"ETL_FINISH_TIME", TYPE_STRING, sizeof(StringRef), true},
+ {"LOAD_START_TIME", TYPE_STRING, sizeof(StringRef), true},
+ {"LOAD_FINISH_TIME", TYPE_STRING, sizeof(StringRef), true},
+ {"URL", TYPE_STRING, sizeof(StringRef), true},
+ {"JOB_DETAILS", TYPE_STRING, sizeof(StringRef), true},
+ {"TRANSACTION_ID", TYPE_STRING, sizeof(StringRef), true},
+ {"ERROR_TABLETS", TYPE_STRING, sizeof(StringRef), true},
+ {"USER", TYPE_STRING, sizeof(StringRef), true},
+ {"COMMENT", TYPE_STRING, sizeof(StringRef), true},
+ {"FIRST_ERROR_MSG", TYPE_STRING, sizeof(StringRef), true},
+};
+
+SchemaLoadJobScanner::SchemaLoadJobScanner()
+ : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_LOAD_JOBS) {}
+
+SchemaLoadJobScanner::~SchemaLoadJobScanner() {}
+
+Status SchemaLoadJobScanner::start(RuntimeState* state) {
+ if (!_is_init) {
+ return Status::InternalError("used before initialized.");
+ }
+ TFetchLoadJobRequest request;
+ RETURN_IF_ERROR(SchemaHelper::fetch_load_job(*(_param->common_param->ip),
+ _param->common_param->port,
request, &_result));
+ return Status::OK();
+}
+
+Status SchemaLoadJobScanner::get_next_block_internal(vectorized::Block* block,
bool* eos) {
+ if (!_is_init) {
+ return Status::InternalError("call this before initial.");
+ }
+ if (block == nullptr || eos == nullptr) {
+ return Status::InternalError("invalid parameter.");
+ }
+
+ *eos = true;
+ if (_result.loadJobs.empty()) {
+ return Status::OK();
+ }
+
+ return _fill_block_impl(block);
+}
+
+Status SchemaLoadJobScanner::_fill_block_impl(vectorized::Block* block) {
+ SCOPED_TIMER(_fill_block_timer);
+
+ const auto& jobs_info = _result.loadJobs;
+ size_t row_num = jobs_info.size();
+ if (row_num == 0) {
+ return Status::OK();
+ }
+
+ for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) {
+ const auto& col_desc = _s_tbls_columns[col_idx];
+
+ std::vector<StringRef> str_refs(row_num);
+ std::vector<void*> datas(row_num);
+ std::vector<std::string> column_values(row_num);
+
+ for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
+ const auto& job_info = jobs_info[row_idx];
+ std::string& column_value = column_values[row_idx];
+
+ if (col_desc.type == TYPE_STRING) {
+ switch (col_idx) {
+ case 0: // JOB_ID
+ column_value = job_info.__isset.job_id ? job_info.job_id :
"";
+ break;
+ case 1: // LABEL
+ column_value = job_info.__isset.label ? job_info.label :
"";
+ break;
+ case 2: // STATE
+ column_value = job_info.__isset.state ? job_info.state :
"";
+ break;
+ case 3: // PROGRESS
+ column_value = job_info.__isset.progress ?
job_info.progress : "";
+ break;
+ case 4: // TYPE
+ column_value = job_info.__isset.type ? job_info.type : "";
+ break;
+ case 5: // ETL_INFO
+ column_value = job_info.__isset.etl_info ?
job_info.etl_info : "";
+ break;
+ case 6: // TASK_INFO
+ column_value = job_info.__isset.task_info ?
job_info.task_info : "";
+ break;
+ case 7: // ERROR_MSG
+ column_value = job_info.__isset.error_msg ?
job_info.error_msg : "";
+ break;
+ case 8: // CREATE_TIME
+ column_value = job_info.__isset.create_time ?
job_info.create_time : "";
+ break;
+ case 9: // ETL_START_TIME
+ column_value = job_info.__isset.etl_start_time ?
job_info.etl_start_time : "";
+ break;
+ case 10: // ETL_FINISH_TIME
+ column_value = job_info.__isset.etl_finish_time ?
job_info.etl_finish_time : "";
+ break;
+ case 11: // LOAD_START_TIME
+ column_value = job_info.__isset.load_start_time ?
job_info.load_start_time : "";
+ break;
+ case 12: // LOAD_FINISH_TIME
+ column_value =
+ job_info.__isset.load_finish_time ?
job_info.load_finish_time : "";
+ break;
+ case 13: // URL
+ column_value = job_info.__isset.url ? job_info.url : "";
+ break;
+ case 14: // JOB_DETAILS
+ column_value = job_info.__isset.job_details ?
job_info.job_details : "";
+ break;
+ case 15: // TRANSACTION_ID
+ column_value = job_info.__isset.transaction_id ?
job_info.transaction_id : "";
+ break;
+ case 16: // ERROR_TABLETS
+ column_value = job_info.__isset.error_tablets ?
job_info.error_tablets : "";
+ break;
+ case 17: // USER
+ column_value = job_info.__isset.user ? job_info.user : "";
+ break;
+ case 18: // COMMENT
+ column_value = job_info.__isset.comment ? job_info.comment
: "";
+ break;
+ case 19: // FIRST_ERROR_MSG
+ column_value = job_info.__isset.first_error_msg ?
job_info.first_error_msg : "";
+ break;
+ }
+
+ str_refs[row_idx] =
+ StringRef(column_values[row_idx].data(),
column_values[row_idx].size());
+ datas[row_idx] = &str_refs[row_idx];
+ }
+ }
+
+ RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas));
+ }
+
+ return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_load_job_scanner.h
b/be/src/exec/schema_scanner/schema_load_job_scanner.h
new file mode 100644
index 00000000000..8fcd5fa56d8
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_load_job_scanner.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/FrontendService_types.h>
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaLoadJobScanner : public SchemaScanner {
+ ENABLE_FACTORY_CREATOR(SchemaLoadJobScanner);
+
+public:
+ SchemaLoadJobScanner();
+ ~SchemaLoadJobScanner() override;
+
+ Status start(RuntimeState* state) override;
+ Status get_next_block_internal(vectorized::Block* block, bool* eos)
override;
+
+private:
+ Status _fill_block_impl(vectorized::Block* block);
+
+ TFetchLoadJobResult _result;
+ static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+};
+
+} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index aafcae4a913..6ef39998594 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -96,6 +96,8 @@ public enum SchemaTableType {
TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE),
SCH_ROUTINE_LOAD_JOBS("ROUTINE_LOAD_JOBS", "ROUTINE_LOAD_JOBS",
TSchemaTableType.SCH_ROUTINE_LOAD_JOBS),
+ SCH_LOAD_JOBS("LOAD_JOBS", "LOAD_JOBS",
+ TSchemaTableType.SCH_LOAD_JOBS),
SCH_VIEW_DEPENDENCY("VIEW_DEPENDENCY", "VIEW_DEPENDENCY",
TSchemaTableType.SCH_VIEW_DEPENDENCY),
SQL_BLOCK_RULE_STATUS("SQL_BLOCK_RULE_STATUS", "SQL_BLOCK_RULE_STATUS",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index deb8aac86a0..dc53598c2c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -658,6 +658,30 @@ public class SchemaTable extends Table {
.column("IS_ABNORMAL_PAUSE",
ScalarType.createType(PrimitiveType.BOOLEAN))
.build())
)
+ .put("load_jobs",
+ new SchemaTable(SystemIdGenerator.getNextId(),
"load_jobs", TableType.SCHEMA,
+ builder().column("JOB_ID",
ScalarType.createStringType())
+ .column("LABEL",
ScalarType.createStringType())
+ .column("STATE",
ScalarType.createStringType())
+ .column("PROGRESS",
ScalarType.createStringType())
+ .column("TYPE",
ScalarType.createStringType())
+ .column("ETL_INFO",
ScalarType.createStringType())
+ .column("TASK_INFO",
ScalarType.createStringType())
+ .column("ERROR_MSG",
ScalarType.createStringType())
+ .column("CREATE_TIME",
ScalarType.createStringType())
+ .column("ETL_START_TIME",
ScalarType.createStringType())
+ .column("ETL_FINISH_TIME",
ScalarType.createStringType())
+ .column("LOAD_START_TIME",
ScalarType.createStringType())
+ .column("LOAD_FINISH_TIME",
ScalarType.createStringType())
+ .column("URL",
ScalarType.createStringType())
+ .column("JOB_DETAILS",
ScalarType.createStringType())
+ .column("TRANSACTION_ID",
ScalarType.createStringType())
+ .column("ERROR_TABLETS",
ScalarType.createStringType())
+ .column("USER",
ScalarType.createStringType())
+ .column("COMMENT",
ScalarType.createStringType())
+ .column("FIRST_ERROR_MSG",
ScalarType.createStringType())
+ .build())
+ )
.put("backend_tablets", new
SchemaTable(SystemIdGenerator.getNextId(), "backend_tablets", TableType.SCHEMA,
builder().column("BE_ID",
ScalarType.createType(PrimitiveType.BIGINT))
.column("TABLET_ID",
ScalarType.createType(PrimitiveType.BIGINT))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7061942b384..481c5a607d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -91,6 +91,7 @@ import org.apache.doris.encryption.EncryptionKey;
import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.load.StreamLoadHandler;
+import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.ErrorReason;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
@@ -164,6 +165,8 @@ import org.apache.doris.thrift.TDropPlsqlPackageRequest;
import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest;
import org.apache.doris.thrift.TEncryptionAlgorithm;
import org.apache.doris.thrift.TEncryptionKey;
+import org.apache.doris.thrift.TFetchLoadJobRequest;
+import org.apache.doris.thrift.TFetchLoadJobResult;
import org.apache.doris.thrift.TFetchResourceResult;
import org.apache.doris.thrift.TFetchRoutineLoadJobRequest;
import org.apache.doris.thrift.TFetchRoutineLoadJobResult;
@@ -214,6 +217,7 @@ import
org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TListPrivilegesResult;
import org.apache.doris.thrift.TListTableMetadataNameIdsResult;
import org.apache.doris.thrift.TListTableStatusResult;
+import org.apache.doris.thrift.TLoadJob;
import org.apache.doris.thrift.TLoadTxn2PCRequest;
import org.apache.doris.thrift.TLoadTxn2PCResult;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
@@ -4528,6 +4532,83 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ @Override
+ public TFetchLoadJobResult fetchLoadJob(TFetchLoadJobRequest request) {
+ TFetchLoadJobResult result = new TFetchLoadJobResult();
+
+ if (!Env.getCurrentEnv().isReady()) {
+ return result;
+ }
+
+ // Create a ConnectContext with skipAuth=true for system table access
+ // This is necessary because LoadJob.checkAuth() requires a
ConnectContext
+ // and system table queries from backend don't have user context
+ ConnectContext ctx = new ConnectContext();
+ ctx.setEnv(Env.getCurrentEnv());
+ ctx.setSkipAuth(true);
+ ctx.setThreadLocalInfo();
+
+ try {
+ LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
+ List<TLoadJob> jobInfos = Lists.newArrayList();
+ List<String> dbNames =
Env.getCurrentInternalCatalog().getDbNames();
+ for (String dbName : dbNames) {
+ DatabaseIf db;
+ try {
+ db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+ } catch (Exception e) {
+ LOG.warn("Failed to get database: {}", dbName, e);
+ continue;
+ }
+ long dbId = db.getId();
+ try {
+ List<List<Comparable>> loadJobInfosByDb =
loadManager.getLoadJobInfosByDb(
+ dbId, null, false, null);
+ for (List<Comparable> jobInfo : loadJobInfosByDb) {
+ TLoadJob tJob = new TLoadJob();
+ // Based on LOAD_TITLE_NAMES order:
+ // JobId, Label, State, Progress, Type, EtlInfo,
TaskInfo, ErrorMsg, CreateTime,
+ // EtlStartTime, EtlFinishTime, LoadStartTime,
LoadFinishTime, URL, JobDetails,
+ // TransactionId, ErrorTablets, User, Comment,
FirstErrorMsg
+ if (jobInfo.size() >= 20) {
+ tJob.setJobId(String.valueOf(jobInfo.get(0)));
+ tJob.setLabel(String.valueOf(jobInfo.get(1)));
+ tJob.setState(String.valueOf(jobInfo.get(2)));
+ tJob.setProgress(String.valueOf(jobInfo.get(3)));
+ tJob.setType(String.valueOf(jobInfo.get(4)));
+ tJob.setEtlInfo(String.valueOf(jobInfo.get(5)));
+ tJob.setTaskInfo(String.valueOf(jobInfo.get(6)));
+ tJob.setErrorMsg(String.valueOf(jobInfo.get(7)));
+ tJob.setCreateTime(String.valueOf(jobInfo.get(8)));
+
tJob.setEtlStartTime(String.valueOf(jobInfo.get(9)));
+
tJob.setEtlFinishTime(String.valueOf(jobInfo.get(10)));
+
tJob.setLoadStartTime(String.valueOf(jobInfo.get(11)));
+
tJob.setLoadFinishTime(String.valueOf(jobInfo.get(12)));
+ tJob.setUrl(String.valueOf(jobInfo.get(13)));
+
tJob.setJobDetails(String.valueOf(jobInfo.get(14)));
+
tJob.setTransactionId(String.valueOf(jobInfo.get(15)));
+
tJob.setErrorTablets(String.valueOf(jobInfo.get(16)));
+ tJob.setUser(String.valueOf(jobInfo.get(17)));
+ tJob.setComment(String.valueOf(jobInfo.get(18)));
+
tJob.setFirstErrorMsg(String.valueOf(jobInfo.get(19)));
+ jobInfos.add(tJob);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get load jobs for database: {}",
dbName, e);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("load job infos: {}", jobInfos);
+ }
+ result.setLoadJobs(jobInfos);
+
+ return result;
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
@Override
public TGetTableTDEInfoResult getTableTDEInfo(TGetTableTDEInfoRequest
request) throws TException {
String clientAddr = getClientAddrAsString();
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index df769272bb1..c3f38f14cc5 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -160,6 +160,7 @@ enum TSchemaTableType {
SCH_CLUSTER_SNAPSHOTS = 60;
SCH_CLUSTER_SNAPSHOT_PROPERTIES = 61;
SCH_BLACKHOLE = 62;
+ SCH_LOAD_JOBS = 64;
}
enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 9afc7570225..0c3bdea1a09 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1606,6 +1606,36 @@ struct TFetchRoutineLoadJobResult {
1: optional list<TRoutineLoadJob> routineLoadJobs
}
+struct TFetchLoadJobRequest {
+}
+
+struct TLoadJob {
+ 1: optional string job_id
+ 2: optional string label
+ 3: optional string state
+ 4: optional string progress
+ 5: optional string type
+ 6: optional string etl_info
+ 7: optional string task_info
+ 8: optional string error_msg
+ 9: optional string create_time
+ 10: optional string etl_start_time
+ 11: optional string etl_finish_time
+ 12: optional string load_start_time
+ 13: optional string load_finish_time
+ 14: optional string url
+ 15: optional string job_details
+ 16: optional string transaction_id
+ 17: optional string error_tablets
+ 18: optional string user
+ 19: optional string comment
+ 20: optional string first_error_msg
+}
+
+struct TFetchLoadJobResult {
+ 1: optional list<TLoadJob> loadJobs
+}
+
struct TPlanNodeRuntimeStatsItem {
// node_id means PlanNodeId, add this field so that we can merge
RuntimeProfile of same node more easily
1: optional i32 node_id
@@ -1774,6 +1804,8 @@ service FrontendService {
TFetchRoutineLoadJobResult fetchRoutineLoadJob(1:
TFetchRoutineLoadJobRequest request)
+ TFetchLoadJobResult fetchLoadJob(1: TFetchLoadJobRequest request)
+
TGetEncryptionKeysResult getEncryptionKeys(1: TGetEncryptionKeysRequest
request)
TGetTableTDEInfoResult getTableTDEInfo(1: TGetTableTDEInfoRequest request)
diff --git
a/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy
b/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy
new file mode 100644
index 00000000000..05cad21b3ce
--- /dev/null
+++
b/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_load_job_info_system_table", "p0") {
+ def tableName = "test_load_job_info_table"
+ def label = "test_load_job_label_" +
UUID.randomUUID().toString().replace("-", "")
+ sql """
+ DROP TABLE IF EXISTS ${tableName}
+ """
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ k1 INT NOT NULL,
+ k2 VARCHAR(50) NOT NULL,
+ k3 DATETIME NOT NULL,
+ v1 INT SUM DEFAULT '0'
+ )
+ AGGREGATE KEY(k1, k2, k3)
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """
+ INSERT INTO ${tableName} with label ${label} select 1, 'test1',
'2023-01-01 00:00:00', 10
+ """
+
+ def res = sql """
+ SELECT
+ JOB_ID,
+ LABEL,
+ STATE,
+ PROGRESS,
+ TYPE,
+ ETL_INFO,
+ TASK_INFO,
+ ERROR_MSG,
+ CREATE_TIME,
+ ETL_START_TIME,
+ ETL_FINISH_TIME,
+ LOAD_START_TIME,
+ LOAD_FINISH_TIME,
+ URL,
+ JOB_DETAILS,
+ TRANSACTION_ID,
+ ERROR_TABLETS,
+ USER,
+ COMMENT,
+ FIRST_ERROR_MSG
+ FROM
+ information_schema.load_jobs
+ WHERE
+ LABEL = '${label}'
+ """
+
+ log.info("Result size: ${res.size()}")
+ assertTrue(res.size() > 0, "Job should appear in load_jobs system table
after ${label} insert")
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]