This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e18ea534d9a [improve](information schema) introduce routine load job
system table (#48963)
e18ea534d9a is described below
commit e18ea534d9a009bc3c128515fa56c07d9c538b74
Author: hui lai <[email protected]>
AuthorDate: Mon Mar 17 11:06:53 2025 +0800
[improve](information schema) introduce routine load job system table
(#48963)
### What problem does this PR solve?
Part IV of https://github.com/apache/doris/issues/48511
doc https://github.com/apache/doris-website/pull/2196
**Introduce routine load job statistic system table:**
```
mysql> show create table information_schema.routine_load_job\G
*************************** 1. row ***************************
Table: routine_load_job
Create Table: CREATE TABLE `routine_load_job` (
`JOB_ID` text NULL,
`JOB_NAME` text NULL,
`CREATE_TIME` text NULL,
`PAUSE_TIME` text NULL,
`END_TIME` text NULL,
`DB_NAME` text NULL,
`TABLE_NAME` text NULL,
`STATE` text NULL,
`CURRENT_TASK_NUM` text NULL,
`JOB_PROPERTIES` text NULL,
`DATA_SOURCE_PROPERTIES` text NULL,
`CUSTOM_PROPERTIES` text NULL,
`STATISTIC` text NULL,
`PROGRESS` text NULL,
`LAG` text NULL,
`REASON_OF_STATE_CHANGED` text NULL,
`ERROR_LOG_URLS` text NULL,
`USER_NAME` text NULL,
`CURRENT_ABORT_TASK_NUM` int NULL,
`IS_ABNORMAL_PAUSE` boolean NULL
) ENGINE=SCHEMA;
1 row in set (0.00 sec)
```
**There are some benefits to empower job with SQL query capability for
statistical information:**
- It can be used in conjunction with metrics add through
https://github.com/apache/doris/pull/48209 to roughly locate abnormal
jobs when Grafana alarms, and the following SQL can be used:
```
SELECT JOB_NAME
FROM information_schema.routine_load_job_statistics
WHERE CURRENT_ABORT_TASK_NUM > 0
OR IS_ABNORMAL_PAUSE = TRUE;
```
- User can use the `select * from information_schema.routine_load_job`
instead of the `show routine load`. The advantage is that the `show
routine load` can only be searched by name, but SQL can be very flexible
in locating jobs
---
be/src/exec/schema_scanner.cpp | 3 +
be/src/exec/schema_scanner/schema_helper.cpp | 9 +
be/src/exec/schema_scanner/schema_helper.h | 6 +
.../schema_routine_load_job_scanner.cpp | 199 +++++++++++++++++++++
.../schema_routine_load_job_scanner.h | 50 ++++++
.../org/apache/doris/analysis/SchemaTableType.java | 4 +-
.../java/org/apache/doris/catalog/SchemaTable.java | 24 +++
.../load/routineload/KafkaRoutineLoadJob.java | 12 +-
.../doris/load/routineload/RoutineLoadJob.java | 63 +++++--
.../doris/load/routineload/RoutineLoadManager.java | 4 +
.../load/routineload/RoutineLoadProgress.java | 2 +-
.../load/routineload/RoutineLoadStatistic.java | 1 +
.../apache/doris/service/FrontendServiceImpl.java | 56 ++++++
gensrc/thrift/Descriptors.thrift | 3 +-
gensrc/thrift/FrontendService.thrift | 32 ++++
.../test_routine_load_job_info_system_table.groovy | 142 +++++++++++++++
16 files changed, 584 insertions(+), 26 deletions(-)
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 3c3e57a8522..40b0616c9a5 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -40,6 +40,7 @@
#include "exec/schema_scanner/schema_partitions_scanner.h"
#include "exec/schema_scanner/schema_processlist_scanner.h"
#include "exec/schema_scanner/schema_profiling_scanner.h"
+#include "exec/schema_scanner/schema_routine_load_job_scanner.h"
#include "exec/schema_scanner/schema_routine_scanner.h"
#include "exec/schema_scanner/schema_rowsets_scanner.h"
#include "exec/schema_scanner/schema_schema_privileges_scanner.h"
@@ -228,6 +229,8 @@ std::unique_ptr<SchemaScanner>
SchemaScanner::create(TSchemaTableType::type type
return SchemaCatalogMetaCacheStatsScanner::create_unique();
case TSchemaTableType::SCH_BACKEND_KERBEROS_TICKET_CACHE:
return SchemaBackendKerberosTicketCacheScanner::create_unique();
+ case TSchemaTableType::SCH_ROUTINE_LOAD_JOB:
+ return SchemaRoutineLoadJobScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
diff --git a/be/src/exec/schema_scanner/schema_helper.cpp
b/be/src/exec/schema_scanner/schema_helper.cpp
index 7cf95187b02..13670444e53 100644
--- a/be/src/exec/schema_scanner/schema_helper.cpp
+++ b/be/src/exec/schema_scanner/schema_helper.cpp
@@ -142,4 +142,13 @@ Status SchemaHelper::show_user(const std::string& ip,
const int32_t port,
});
}
+Status SchemaHelper::fetch_routine_load_job(const std::string& ip, const
int32_t port,
+ const TFetchRoutineLoadJobRequest&
request,
+ TFetchRoutineLoadJobResult*
result) {
+ return ThriftRpcHelper::rpc<FrontendServiceClient>(
+ ip, port, [&request, &result](FrontendServiceConnection& client) {
+ client->fetchRoutineLoadJob(*result, request);
+ });
+}
+
} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_helper.h
b/be/src/exec/schema_scanner/schema_helper.h
index bc794093128..53d37389371 100644
--- a/be/src/exec/schema_scanner/schema_helper.h
+++ b/be/src/exec/schema_scanner/schema_helper.h
@@ -26,6 +26,8 @@
namespace doris {
class TDescribeTablesParams;
class TDescribeTablesResult;
+class TFetchRoutineLoadJobRequest;
+class TFetchRoutineLoadJobResult;
class TGetDbsParams;
class TGetDbsResult;
class TGetTablesParams;
@@ -84,6 +86,10 @@ public:
TShowProcessListResult* result);
static Status show_user(const std::string& ip, const int32_t port,
const TShowUserRequest& request, TShowUserResult*
result);
+
+ static Status fetch_routine_load_job(const std::string& ip, const int32_t
port,
+ const TFetchRoutineLoadJobRequest&
request,
+ TFetchRoutineLoadJobResult* result);
};
} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
new file mode 100644
index 00000000000..e061ab790cf
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_routine_load_job_scanner.h"
+
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/FrontendService_types.h>
+
+#include <string>
+
+#include "exec/schema_scanner/schema_helper.h"
+#include "runtime/runtime_state.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+std::vector<SchemaScanner::ColumnDesc>
SchemaRoutineLoadJobScanner::_s_tbls_columns = {
+ // name, type, size, is_null
+ {"JOB_ID", TYPE_STRING, sizeof(StringRef), true},
+ {"JOB_NAME", TYPE_STRING, sizeof(StringRef), true},
+ {"CREATE_TIME", TYPE_STRING, sizeof(StringRef), true},
+ {"PAUSE_TIME", TYPE_STRING, sizeof(StringRef), true},
+ {"END_TIME", TYPE_STRING, sizeof(StringRef), true},
+ {"DB_NAME", TYPE_STRING, sizeof(StringRef), true},
+ {"TABLE_NAME", TYPE_STRING, sizeof(StringRef), true},
+ {"STATE", TYPE_STRING, sizeof(StringRef), true},
+ {"CURRENT_TASK_NUM", TYPE_STRING, sizeof(StringRef), true},
+ {"JOB_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
+ {"DATA_SOURCE_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
+ {"CUSTOM_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
+ {"STATISTIC", TYPE_STRING, sizeof(StringRef), true},
+ {"PROGRESS", TYPE_STRING, sizeof(StringRef), true},
+ {"LAG", TYPE_STRING, sizeof(StringRef), true},
+ {"REASON_OF_STATE_CHANGED", TYPE_STRING, sizeof(StringRef), true},
+ {"ERROR_LOG_URLS", TYPE_STRING, sizeof(StringRef), true},
+ {"USER_NAME", TYPE_STRING, sizeof(StringRef), true},
+ {"CURRENT_ABORT_TASK_NUM", TYPE_INT, sizeof(int32_t), true},
+ {"IS_ABNORMAL_PAUSE", TYPE_BOOLEAN, sizeof(int8_t), true},
+};
+
+SchemaRoutineLoadJobScanner::SchemaRoutineLoadJobScanner()
+ : SchemaScanner(_s_tbls_columns,
TSchemaTableType::SCH_ROUTINE_LOAD_JOB) {}
+
+SchemaRoutineLoadJobScanner::~SchemaRoutineLoadJobScanner() {}
+
+Status SchemaRoutineLoadJobScanner::start(RuntimeState* state) {
+ if (!_is_init) {
+ return Status::InternalError("used before initialized.");
+ }
+ TFetchRoutineLoadJobRequest request;
+ RETURN_IF_ERROR(SchemaHelper::fetch_routine_load_job(
+ *(_param->common_param->ip), _param->common_param->port, request,
&_result));
+ return Status::OK();
+}
+
+Status SchemaRoutineLoadJobScanner::get_next_block_internal(vectorized::Block*
block, bool* eos) {
+ if (!_is_init) {
+ return Status::InternalError("call this before initial.");
+ }
+ if (block == nullptr || eos == nullptr) {
+ return Status::InternalError("invalid parameter.");
+ }
+
+ *eos = true;
+ if (_result.routineLoadJobs.empty()) {
+ return Status::OK();
+ }
+
+ return _fill_block_impl(block);
+}
+
+Status SchemaRoutineLoadJobScanner::_fill_block_impl(vectorized::Block* block)
{
+ SCOPED_TIMER(_fill_block_timer);
+
+ const auto& jobs_info = _result.routineLoadJobs;
+ size_t row_num = jobs_info.size();
+ if (row_num == 0) {
+ return Status::OK();
+ }
+
+ for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) {
+ const auto& col_desc = _s_tbls_columns[col_idx];
+
+ std::vector<StringRef> str_refs(row_num);
+ std::vector<int32_t> int_vals(row_num);
+ std::vector<int8_t> bool_vals(row_num);
+ std::vector<void*> datas(row_num);
+ std::vector<std::string> column_values(row_num);
+
+ for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
+ const auto& job_info = jobs_info[row_idx];
+ std::string& column_value = column_values[row_idx];
+
+ if (col_desc.type == TYPE_STRING) {
+ switch (col_idx) {
+ case 0: // JOB_ID
+ column_value = job_info.__isset.job_id ? job_info.job_id :
"";
+ break;
+ case 1: // JOB_NAME
+ column_value = job_info.__isset.job_name ?
job_info.job_name : "";
+ break;
+ case 2: // CREATE_TIME
+ column_value = job_info.__isset.create_time ?
job_info.create_time : "";
+ break;
+ case 3: // PAUSE_TIME
+ column_value = job_info.__isset.pause_time ?
job_info.pause_time : "";
+ break;
+ case 4: // END_TIME
+ column_value = job_info.__isset.end_time ?
job_info.end_time : "";
+ break;
+ case 5: // DB_NAME
+ column_value = job_info.__isset.db_name ? job_info.db_name
: "";
+ break;
+ case 6: // TABLE_NAME
+ column_value = job_info.__isset.table_name ?
job_info.table_name : "";
+ break;
+ case 7: // STATE
+ column_value = job_info.__isset.state ? job_info.state :
"";
+ break;
+ case 8: // CURRENT_TASK_NUM
+ column_value =
+ job_info.__isset.current_task_num ?
job_info.current_task_num : "";
+ break;
+ case 9: // JOB_PROPERTIES
+ column_value = job_info.__isset.job_properties ?
job_info.job_properties : "";
+ break;
+ case 10: // DATA_SOURCE_PROPERTIES
+ column_value = job_info.__isset.data_source_properties
+ ? job_info.data_source_properties
+ : "";
+ break;
+ case 11: // CUSTOM_PROPERTIES
+ column_value =
+ job_info.__isset.custom_properties ?
job_info.custom_properties : "";
+ break;
+ case 12: // STATISTIC
+ column_value = job_info.__isset.statistic ?
job_info.statistic : "";
+ break;
+ case 13: // PROGRESS
+ column_value = job_info.__isset.progress ?
job_info.progress : "";
+ break;
+ case 14: // LAG
+ column_value = job_info.__isset.lag ? job_info.lag : "";
+ break;
+ case 15: // REASON_OF_STATE_CHANGED
+ column_value = job_info.__isset.reason_of_state_changed
+ ? job_info.reason_of_state_changed
+ : "";
+ break;
+ case 16: // ERROR_LOG_URLS
+ column_value = job_info.__isset.error_log_urls ?
job_info.error_log_urls : "";
+ break;
+ case 17: // USER_NAME
+ column_value = job_info.__isset.user_name ?
job_info.user_name : "";
+ break;
+ }
+
+ str_refs[row_idx] =
+ StringRef(column_values[row_idx].data(),
column_values[row_idx].size());
+ datas[row_idx] = &str_refs[row_idx];
+ } else if (col_desc.type == TYPE_INT) {
+ int_vals[row_idx] = job_info.__isset.current_abort_task_num
+ ? job_info.current_abort_task_num
+ : 0;
+ datas[row_idx] = &int_vals[row_idx];
+ } else if (col_desc.type == TYPE_BOOLEAN) {
+ bool_vals[row_idx] =
+ job_info.__isset.is_abnormal_pause ?
job_info.is_abnormal_pause : false;
+ datas[row_idx] = &bool_vals[row_idx];
+ }
+ }
+
+ RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas));
+ }
+
+ return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h
b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h
new file mode 100644
index 00000000000..1105328776a
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/FrontendService_types.h>
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaRoutineLoadJobScanner : public SchemaScanner {
+ ENABLE_FACTORY_CREATOR(SchemaRoutineLoadJobScanner);
+
+public:
+ SchemaRoutineLoadJobScanner();
+ ~SchemaRoutineLoadJobScanner() override;
+
+ Status start(RuntimeState* state) override;
+ Status get_next_block_internal(vectorized::Block* block, bool* eos)
override;
+
+private:
+ Status _fill_block_impl(vectorized::Block* block);
+
+ TFetchRoutineLoadJobResult _result;
+ static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 7ec9e5cab37..5c5bdeb832c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -90,7 +90,9 @@ public enum SchemaTableType {
SCH_CATALOG_META_CACHE_STATISTICS("CATALOG_META_CACHE_STATISTICS",
"CATALOG_META_CACHE_STATISTICS",
TSchemaTableType.SCH_CATALOG_META_CACHE_STATISTICS),
SCH_BACKEND_KERBEROS_TICKET_CACHE("BACKEND_KERBEROS_TICKET_CACHE",
"BACKEND_KERBEROS_TICKET_CACHE",
- TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE);
+ TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE),
+ SCH_ROUTINE_LOAD_JOB("ROUTINE_LOAD_JOB", "ROUTINE_LOAD_JOB",
+ TSchemaTableType.SCH_ROUTINE_LOAD_JOB);
private static final String dbName = "INFORMATION_SCHEMA";
private static SelectList fullSelectLists;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 75072390821..36992833639 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -597,6 +597,30 @@ public class SchemaTable extends Table {
.column("REFRESH_INTERVAL_SECOND",
ScalarType.createType(PrimitiveType.BIGINT))
.build())
)
+ .put("routine_load_job",
+ new SchemaTable(SystemIdGenerator.getNextId(),
"routine_load_job", TableType.SCHEMA,
+ builder().column("JOB_ID",
ScalarType.createStringType())
+ .column("JOB_NAME",
ScalarType.createStringType())
+ .column("CREATE_TIME",
ScalarType.createStringType())
+ .column("PAUSE_TIME",
ScalarType.createStringType())
+ .column("END_TIME",
ScalarType.createStringType())
+ .column("DB_NAME",
ScalarType.createStringType())
+ .column("TABLE_NAME",
ScalarType.createStringType())
+ .column("STATE",
ScalarType.createStringType())
+ .column("CURRENT_TASK_NUM",
ScalarType.createStringType())
+ .column("JOB_PROPERTIES",
ScalarType.createStringType())
+ .column("DATA_SOURCE_PROPERTIES",
ScalarType.createStringType())
+ .column("CUSTOM_PROPERTIES",
ScalarType.createStringType())
+ .column("STATISTIC",
ScalarType.createStringType())
+ .column("PROGRESS",
ScalarType.createStringType())
+ .column("LAG",
ScalarType.createStringType())
+ .column("REASON_OF_STATE_CHANGED",
ScalarType.createStringType())
+ .column("ERROR_LOG_URLS",
ScalarType.createStringType())
+ .column("USER_NAME",
ScalarType.createStringType())
+ .column("CURRENT_ABORT_TASK_NUM",
ScalarType.createType(PrimitiveType.INT))
+ .column("IS_ABNORMAL_PAUSE",
ScalarType.createType(PrimitiveType.BOOLEAN))
+ .build())
+ )
.build();
private boolean fetchAllFe = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 4904b59b5b6..bbf34b93258 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -474,7 +474,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected String getStatistic() {
+ public String getStatistic() {
Map<String, Object> summary = this.jobStatistic.summary();
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(summary);
@@ -635,7 +635,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected String dataSourcePropertiesJsonToString() {
+ public String dataSourcePropertiesJsonToString() {
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put("brokerList", brokerList);
dataSourceProperties.put("topic", topic);
@@ -647,13 +647,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected String customPropertiesJsonToString() {
+ public String customPropertiesJsonToString() {
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(customProperties);
}
@Override
- protected Map<String, String> getDataSourceProperties() {
+ public Map<String, String> getDataSourceProperties() {
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put("kafka_broker_list", brokerList);
dataSourceProperties.put("kafka_topic", topic);
@@ -661,7 +661,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected Map<String, String> getCustomProperties() {
+ public Map<String, String> getCustomProperties() {
Map<String, String> ret = new HashMap<>();
customProperties.forEach((k, v) -> ret.put("property." + k, v));
return ret;
@@ -910,7 +910,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected String getLag() {
+ public String getLag() {
Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress)
progress).getLag(cachedPartitionWithLatestOffsets);
Gson gson = new Gson();
return gson.toJson(partitionIdToOffsetLag);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 5dbca308f6c..859d5df43e6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -502,6 +502,18 @@ public abstract class RoutineLoadJob
return dbId;
}
+ public String getCreateTimestampString() {
+ return TimeUtils.longToTimeString(createTimestamp);
+ }
+
+ public String getPauseTimestampString() {
+ return TimeUtils.longToTimeString(pauseTimestamp);
+ }
+
+ public String getEndTimestampString() {
+ return TimeUtils.longToTimeString(endTimestamp);
+ }
+
public void setOtherMsg(String otherMsg) {
writeLock();
try {
@@ -555,6 +567,10 @@ public abstract class RoutineLoadJob
return endTimestamp;
}
+ public RoutineLoadStatistic getJobStatistic() {
+ return jobStatistic;
+ }
+
public PartitionNames getPartitions() {
return partitions;
}
@@ -790,6 +806,10 @@ public abstract class RoutineLoadJob
}
}
+ public Queue<String> getErrorLogUrls() {
+ return errorLogUrls;
+ }
+
// RoutineLoadScheduler will run this method at fixed interval, and renew
the timeout tasks
public void processTimeoutTasks() {
writeLock();
@@ -845,6 +865,11 @@ public abstract class RoutineLoadJob
}
}
+ public boolean isAbnormalPause() {
+ return this.state == JobState.PAUSED && this.pauseReason != null
+ && this.pauseReason.getCode() !=
InternalErrorCode.MANUAL_PAUSE_ERR;
+ }
+
// All of private method could not be call without lock
private void checkStateTransform(RoutineLoadJob.JobState desireState)
throws UserException {
switch (state) {
@@ -939,6 +964,7 @@ public abstract class RoutineLoadJob
this.jobStatistic.currentErrorRows = 0;
this.jobStatistic.currentTotalRows = 0;
this.otherMsg = "";
+ this.jobStatistic.currentAbortedTaskNum = 0;
} else if (this.jobStatistic.currentErrorRows > maxErrorNum
|| (this.jobStatistic.currentTotalRows > 0
&& ((double) this.jobStatistic.currentErrorRows
@@ -1246,6 +1272,7 @@ public abstract class RoutineLoadJob
.build());
}
++this.jobStatistic.abortedTaskNum;
+ ++this.jobStatistic.currentAbortedTaskNum;
TransactionState.TxnStatusChangeReason txnStatusChangeReason =
null;
if (txnStatusChangeReasonString != null) {
txnStatusChangeReason =
@@ -1615,9 +1642,20 @@ public abstract class RoutineLoadJob
TransactionState txnState,
TransactionState.TxnStatusChangeReason txnStatusChangeReason);
- protected abstract String getStatistic();
+ public abstract String getStatistic();
+
+ public abstract String getLag();
- protected abstract String getLag();
+ public String getStateReason() {
+ switch (state) {
+ case PAUSED:
+ return pauseReason == null ? "" : pauseReason.toString();
+ case CANCELLED:
+ return cancelReason == null ? "" : cancelReason.toString();
+ default:
+ return "";
+ }
+ }
public List<String> getShowInfo() {
Optional<Database> database =
Env.getCurrentInternalCatalog().getDb(dbId);
@@ -1647,16 +1685,7 @@ public abstract class RoutineLoadJob
row.add(getStatistic());
row.add(getProgress().toJsonString());
row.add(getLag());
- switch (state) {
- case PAUSED:
- row.add(pauseReason == null ? "" : pauseReason.toString());
- break;
- case CANCELLED:
- row.add(cancelReason == null ? "" :
cancelReason.toString());
- break;
- default:
- row.add("");
- }
+ row.add(getStateReason());
row.add(Joiner.on(", ").join(errorLogUrls));
row.add(otherMsg);
row.add(userIdentity.getQualifiedUser());
@@ -1819,7 +1848,7 @@ public abstract class RoutineLoadJob
}
}
- private String jobPropertiesToJsonString() {
+ public String jobPropertiesToJsonString() {
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put("partitions", partitions == null
? STAR_STRING :
Joiner.on(",").join(partitions.getPartitionNames()));
@@ -1853,13 +1882,13 @@ public abstract class RoutineLoadJob
return gson.toJson(jobProperties);
}
- abstract String dataSourcePropertiesJsonToString();
+ public abstract String dataSourcePropertiesJsonToString();
- abstract String customPropertiesJsonToString();
+ public abstract String customPropertiesJsonToString();
- abstract Map<String, String> getDataSourceProperties();
+ public abstract Map<String, String> getDataSourceProperties();
- abstract Map<String, String> getCustomProperties();
+ public abstract Map<String, String> getCustomProperties();
public boolean isExpired() {
if (!isFinal()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index a984a672d34..bfe42ad7695 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -110,6 +110,10 @@ public class RoutineLoadManager implements Writable {
public RoutineLoadManager() {
}
+ public List<RoutineLoadJob> getAllRoutineLoadJobs() {
+ return new ArrayList<>(idToRoutineLoadJob.values());
+ }
+
public List<RoutineLoadJob> getActiveRoutineLoadJobs() {
return idToRoutineLoadJob.values().stream()
.filter(job -> !job.state.isFinalState())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
index bb3b3e88daa..cf16b45fa13 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
@@ -39,7 +39,7 @@ public abstract class RoutineLoadProgress {
abstract void update(RLTaskTxnCommitAttachment attachment);
- abstract String toJsonString();
+ public abstract String toJsonString();
public static RoutineLoadProgress read(DataInput in) throws IOException {
RoutineLoadProgress progress = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
index ad10367f982..e5b01c50e26 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
@@ -61,6 +61,7 @@ public class RoutineLoadStatistic {
public long committedTaskNum = 0;
@SerializedName(value = "abortedTaskNum")
public long abortedTaskNum = 0;
+ public int currentAbortedTaskNum = 0;
// Save all transactions current running. Including PREPARE, COMMITTED.
// No need to persist, only for tracing txn of routine load job.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 2928d84e549..9c9b2eedc75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -85,6 +85,7 @@ import org.apache.doris.load.StreamLoadHandler;
import org.apache.doris.load.routineload.ErrorReason;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
+import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -147,6 +148,8 @@ import org.apache.doris.thrift.TDropPlsqlPackageRequest;
import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest;
import org.apache.doris.thrift.TFeResult;
import org.apache.doris.thrift.TFetchResourceResult;
+import org.apache.doris.thrift.TFetchRoutineLoadJobRequest;
+import org.apache.doris.thrift.TFetchRoutineLoadJobResult;
import org.apache.doris.thrift.TFetchRunningQueriesRequest;
import org.apache.doris.thrift.TFetchRunningQueriesResult;
import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
@@ -228,6 +231,7 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest;
import org.apache.doris.thrift.TRestoreSnapshotResult;
import org.apache.doris.thrift.TRollbackTxnRequest;
import org.apache.doris.thrift.TRollbackTxnResult;
+import org.apache.doris.thrift.TRoutineLoadJob;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TSchemaTableName;
import org.apache.doris.thrift.TShowProcessListRequest;
@@ -266,6 +270,7 @@ import
org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.transaction.TxnCommitAttachment;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -4211,4 +4216,55 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setRunningQueries(runningQueries);
return result;
}
+
+ @Override
+ public TFetchRoutineLoadJobResult
fetchRoutineLoadJob(TFetchRoutineLoadJobRequest request) {
+ TFetchRoutineLoadJobResult result = new TFetchRoutineLoadJobResult();
+
+ if (!Env.getCurrentEnv().isReady()) {
+ return result;
+ }
+
+ RoutineLoadManager routineLoadManager =
Env.getCurrentEnv().getRoutineLoadManager();
+ List<TRoutineLoadJob> jobInfos = Lists.newArrayList();
+ List<RoutineLoadJob> routineLoadJobs =
routineLoadManager.getAllRoutineLoadJobs();
+ for (RoutineLoadJob job : routineLoadJobs) {
+ TRoutineLoadJob jobInfo = new TRoutineLoadJob();
+ jobInfo.setJobId(String.valueOf(job.getId()));
+ jobInfo.setJobName(job.getName());
+ jobInfo.setCreateTime(job.getCreateTimestampString());
+ jobInfo.setPauseTime(job.getPauseTimestampString());
+ jobInfo.setEndTime(job.getEndTimestampString());
+ String dbName = "";
+ String tableName = "";
+ try {
+ dbName = job.getDbFullName();
+ tableName = job.getTableName();
+ } catch (MetaNotFoundException e) {
+ LOG.warn("Failed to get db or table name for routine load job:
{}", job.getId(), e);
+ }
+ jobInfo.setDbName(dbName);
+ jobInfo.setTableName(tableName);
+ jobInfo.setState(job.getState().name());
+
jobInfo.setCurrentTaskNum(String.valueOf(job.getSizeOfRoutineLoadTaskInfoList()));
+ jobInfo.setJobProperties(job.jobPropertiesToJsonString());
+
jobInfo.setDataSourceProperties(job.dataSourcePropertiesJsonToString());
+ jobInfo.setCustomProperties(job.customPropertiesJsonToString());
+ jobInfo.setStatistic(job.getStatistic());
+ jobInfo.setProgress(job.getProgress().toJsonString());
+ jobInfo.setLag(job.getLag());
+ jobInfo.setReasonOfStateChanged(job.getStateReason());
+ jobInfo.setErrorLogUrls(Joiner.on(",
").join(job.getErrorLogUrls()));
+ jobInfo.setUserName(job.getUserIdentity().getQualifiedUser());
+
jobInfo.setCurrentAbortTaskNum(job.getJobStatistic().currentAbortedTaskNum);
+ jobInfo.setIsAbnormalPause(job.isAbnormalPause());
+ jobInfos.add(jobInfo);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("routine load job infos: {}", jobInfos);
+ }
+ result.setRoutineLoadJobs(jobInfos);
+
+ return result;
+ }
}
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index f4070114617..502246052da 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -139,7 +139,8 @@ enum TSchemaTableType {
SCH_TABLE_PROPERTIES = 50,
SCH_FILE_CACHE_STATISTICS = 51,
SCH_CATALOG_META_CACHE_STATISTICS = 52,
- SCH_BACKEND_KERBEROS_TICKET_CACHE = 53;
+ SCH_BACKEND_KERBEROS_TICKET_CACHE = 53,
+ SCH_ROUTINE_LOAD_JOB = 54;
}
enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 7a13bd504cb..83774eda5de 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1557,6 +1557,36 @@ struct TFetchRunningQueriesResult {
struct TFetchRunningQueriesRequest {
}
+struct TFetchRoutineLoadJobRequest {
+}
+
+struct TRoutineLoadJob {
+ 1: optional string job_id
+ 2: optional string job_name
+ 3: optional string create_time
+ 4: optional string pause_time
+ 5: optional string end_time
+ 6: optional string db_name
+ 7: optional string table_name
+ 8: optional string state
+ 9: optional string current_task_num
+ 10: optional string job_properties
+ 11: optional string data_source_properties
+ 12: optional string custom_properties
+ 13: optional string statistic
+ 14: optional string progress
+ 15: optional string lag
+ 16: optional string reason_of_state_changed
+ 17: optional string error_log_urls
+ 18: optional string user_name
+ 19: optional i32 current_abort_task_num
+ 20: optional bool is_abnormal_pause
+}
+
+struct TFetchRoutineLoadJobResult {
+ 1: optional list<TRoutineLoadJob> routineLoadJobs
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1655,4 +1685,6 @@ service FrontendService {
Status.TStatus updatePartitionStatsCache(1:
TUpdateFollowerPartitionStatsCacheRequest request)
TFetchRunningQueriesResult fetchRunningQueries(1:
TFetchRunningQueriesRequest request)
+
+ TFetchRoutineLoadJobResult fetchRoutineLoadJob(1:
TFetchRoutineLoadJobRequest request)
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
new file mode 100644
index 00000000000..3fa360bf281
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+import groovy.json.JsonSlurper
+
+suite("test_routine_load_job_info_system_table","p0") {
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ def jobName = "test_job_info_system_table_invaild"
+ def tableName = "test_job_info_system_table"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL,
+ kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
+ kd02 TINYINT NOT NULL DEFAULT "1",
+ kd03 SMALLINT NOT NULL DEFAULT "2",
+ kd04 INT NOT NULL DEFAULT "3",
+ kd05 BIGINT NOT NULL DEFAULT "4",
+ kd06 LARGEINT NOT NULL DEFAULT "5",
+ kd07 FLOAT NOT NULL DEFAULT "6.0",
+ kd08 DOUBLE NOT NULL DEFAULT "7.0",
+ kd09 DECIMAL NOT NULL DEFAULT "888888888",
+ kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
+ kd11 DATE NOT NULL DEFAULT "2023-08-24",
+ kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
+ kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd18 JSON NULL,
+
+ INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+ INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+ INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+ INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+ INDEX idx_inverted_k117 (`k17`) USING INVERTED
PROPERTIES("parser" = "english"),
+ INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+ INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+
+ )
+ DUPLICATE KEY(k00)
+ PARTITION BY RANGE(k01)
+ (
+ PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+ PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+ PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+ )
+ DISTRIBUTED BY HASH(k00) BUCKETS 32
+ PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+ );
+ """
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "test_job_info_system_table_invaild",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def count = 0
+ while (true) {
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (state[0][8] == "PAUSED") {
+ break
+ }
+ if (count >= 30) {
+ assertEquals(1, 2)
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ def res = sql "SELECT JOB_NAME FROM
information_schema.routine_load_job WHERE CURRENT_ABORT_TASK_NUM > 0 OR
IS_ABNORMAL_PAUSE = TRUE"
+ log.info("res: ${res}".toString())
+ assertTrue(res.toString().contains("${jobName}"))
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]