This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e8a03adb7c4 [Refactor](executor)Add workload schedule policy table
(#33729)
e8a03adb7c4 is described below
commit e8a03adb7c443b8756d56b206e5a239a41a73985
Author: wangbo <[email protected]>
AuthorDate: Sat Apr 20 12:32:27 2024 +0800
[Refactor](executor)Add workload schedule policy table (#33729)
---
be/src/exec/schema_scanner.cpp | 50 ++++++++
be/src/exec/schema_scanner.h | 4 +
.../schema_workload_groups_scanner.cpp | 29 +----
.../schema_workload_sched_policy_scanner.cpp | 136 +++++++++++++++++++++
.../schema_workload_sched_policy_scanner.h | 52 ++++++++
fe/fe-core/src/main/cup/sql_parser.cup | 4 -
.../org/apache/doris/analysis/SchemaTableType.java | 6 +-
.../analysis/ShowWorkloadSchedPolicyStmt.java | 59 ---------
.../java/org/apache/doris/catalog/SchemaTable.java | 10 ++
.../java/org/apache/doris/qe/ShowExecutor.java | 9 --
.../doris/tablefunction/MetadataGenerator.java | 19 ++-
.../tablefunction/MetadataTableValuedFunction.java | 2 -
.../doris/tablefunction/TableValuedFunctionIf.java | 2 -
.../WorkloadSchedPolicyTableValuedFunction.java | 89 --------------
.../doris/datasource/RefreshCatalogTest.java | 4 +-
gensrc/thrift/Descriptors.thrift | 3 +-
gensrc/thrift/FrontendService.thrift | 1 +
.../jdbc/test_mariadb_jdbc_catalog.out | 1 +
.../jdbc/test_mysql_jdbc_catalog.out | 1 +
.../jdbc/test_mysql_jdbc_catalog_nereids.out | 1 +
.../jdbc/test_mysql_jdbc_driver5_catalog.out | 1 +
.../test_workload_sched_policy.out | 2 +-
.../workload_manager_p0/test_curd_wlg.groovy | 6 +-
.../test_workload_sched_policy.groovy | 6 +-
24 files changed, 290 insertions(+), 207 deletions(-)
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 6c1aac7d0d1..5250d8f1b01 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -48,6 +48,7 @@
#include "exec/schema_scanner/schema_variables_scanner.h"
#include "exec/schema_scanner/schema_views_scanner.h"
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
+#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
#include "olap/hll.h"
#include "runtime/define_primitive_type.h"
#include "util/string_util.h"
@@ -167,6 +168,8 @@ std::unique_ptr<SchemaScanner>
SchemaScanner::create(TSchemaTableType::type type
return SchemaRoutinesScanner::create_unique();
case TSchemaTableType::SCH_USER:
return SchemaUserScanner::create_unique();
+ case TSchemaTableType::SCH_WORKLOAD_SCHEDULE_POLICY:
+ return SchemaWorkloadSchedulePolicyScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
@@ -339,4 +342,51 @@ std::string SchemaScanner::get_db_from_full_name(const
std::string& full_name) {
return full_name;
}
+Status SchemaScanner::insert_block_column(TCell cell, int col_index,
vectorized::Block* block,
+ PrimitiveType type) {
+ vectorized::MutableColumnPtr mutable_col_ptr;
+ mutable_col_ptr =
std::move(*block->get_by_position(col_index).column).assume_mutable();
+ auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+ vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+
+ switch (type) {
+ case TYPE_BIGINT: {
+
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
+ cell.longVal);
+ nullable_column->get_null_map_data().emplace_back(0);
+ break;
+ }
+
+ case TYPE_INT: {
+
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(
+ cell.intVal);
+ nullable_column->get_null_map_data().emplace_back(0);
+ break;
+ }
+
+ case TYPE_BOOLEAN: {
+
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
+ cell.boolVal);
+ nullable_column->get_null_map_data().emplace_back(0);
+ break;
+ }
+
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR: {
+
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(cell.stringVal.data(),
+
cell.stringVal.size());
+ nullable_column->get_null_map_data().emplace_back(0);
+ break;
+ }
+
+ default: {
+ std::stringstream ss;
+ ss << "unsupported column type:" << type;
+ return Status::InternalError(ss.str());
+ }
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index 50954999e2a..a23706ac6a4 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -17,6 +17,7 @@
#pragma once
+#include <gen_cpp/Data_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <stddef.h>
#include <stdint.h>
@@ -105,6 +106,9 @@ protected:
Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas);
+ Status insert_block_column(TCell cell, int col_index, vectorized::Block*
block,
+ PrimitiveType type);
+
// get dbname from catalogname.dbname
// if full_name does not have catalog part, just return origin name.
std::string get_db_from_full_name(const std::string& full_name);
diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
index 03bf9782dcd..55cdfe9cf35 100644
--- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
@@ -103,37 +103,12 @@ Status
SchemaWorkloadGroupsScanner::_get_workload_groups_block_from_fe() {
}
}
- // todo(wb) reuse this callback function
- auto insert_string_value = [&](int col_index, std::string str_val,
vectorized::Block* block) {
- vectorized::MutableColumnPtr mutable_col_ptr;
- mutable_col_ptr =
std::move(*block->get_by_position(col_index).column).assume_mutable();
- auto* nullable_column =
-
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
- vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
-
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
-
str_val.size());
- nullable_column->get_null_map_data().emplace_back(0);
- };
- auto insert_int_value = [&](int col_index, int64_t int_val,
vectorized::Block* block) {
- vectorized::MutableColumnPtr mutable_col_ptr;
- mutable_col_ptr =
std::move(*block->get_by_position(col_index).column).assume_mutable();
- auto* nullable_column =
-
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
- vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
- int_val);
- nullable_column->get_null_map_data().emplace_back(0);
- };
-
for (int i = 0; i < result_data.size(); i++) {
TRow row = result_data[i];
for (int j = 0; j < _s_tbls_columns.size(); j++) {
- if (_s_tbls_columns[j].type == TYPE_BIGINT) {
- insert_int_value(j, row.column_value[j].longVal,
_workload_groups_block.get());
- } else {
- insert_string_value(j, row.column_value[j].stringVal,
_workload_groups_block.get());
- }
+ RETURN_IF_ERROR(insert_block_column(
+ row.column_value[j], j, _workload_groups_block.get(),
_s_tbls_columns[j].type));
}
}
return Status::OK();
diff --git
a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
new file mode 100644
index 00000000000..725544ad5a5
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
+
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/thrift_rpc_helper.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+std::vector<SchemaScanner::ColumnDesc>
SchemaWorkloadSchedulePolicyScanner::_s_tbls_columns = {
+ {"ID", TYPE_BIGINT, sizeof(int64_t), true},
+ {"NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"CONDITION", TYPE_STRING, sizeof(StringRef), true},
+ {"ACTION", TYPE_STRING, sizeof(StringRef), true},
+ {"PRIORITY", TYPE_INT, sizeof(int32_t), true},
+ {"ENABLED", TYPE_BOOLEAN, sizeof(bool), true},
+ {"VERSION", TYPE_INT, sizeof(int32_t), true},
+};
+
+SchemaWorkloadSchedulePolicyScanner::SchemaWorkloadSchedulePolicyScanner()
+ : SchemaScanner(_s_tbls_columns,
TSchemaTableType::SCH_WORKLOAD_SCHEDULE_POLICY) {}
+
+SchemaWorkloadSchedulePolicyScanner::~SchemaWorkloadSchedulePolicyScanner() {}
+
+Status SchemaWorkloadSchedulePolicyScanner::start(RuntimeState* state) {
+ _block_rows_limit = state->batch_size();
+ _rpc_timeout = state->execution_timeout() * 1000;
+ return Status::OK();
+}
+
+Status
SchemaWorkloadSchedulePolicyScanner::_get_workload_schedule_policy_block_from_fe()
{
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+
+ TSchemaTableRequestParams schema_table_request_params;
+ for (int i = 0; i < _s_tbls_columns.size(); i++) {
+ schema_table_request_params.__isset.columns_name = true;
+
schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name);
+ }
+
schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident);
+
+ TFetchSchemaTableDataRequest request;
+
request.__set_schema_table_name(TSchemaTableName::WORKLOAD_SCHEDULE_POLICY);
+ request.__set_schema_table_params(schema_table_request_params);
+
+ TFetchSchemaTableDataResult result;
+
+ RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+ master_addr.hostname, master_addr.port,
+ [&request, &result](FrontendServiceConnection& client) {
+ client->fetchSchemaTableData(result, request);
+ },
+ _rpc_timeout));
+
+ Status status(Status::create(result.status));
+ if (!status.ok()) {
+ LOG(WARNING) << "fetch workload groups from FE failed, errmsg=" <<
status;
+ return status;
+ }
+ std::vector<TRow> result_data = result.data_batch;
+
+ _block = vectorized::Block::create_unique();
+ for (int i = 0; i < _s_tbls_columns.size(); ++i) {
+ TypeDescriptor descriptor(_s_tbls_columns[i].type);
+ auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
+
_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(),
data_type,
+
_s_tbls_columns[i].name));
+ }
+
+ _block->reserve(_block_rows_limit);
+
+ if (result_data.size() > 0) {
+ int col_size = result_data[0].column_value.size();
+ if (col_size != _s_tbls_columns.size()) {
+ return Status::InternalError<false>(
+ "workload policy schema is not match for FE and BE");
+ }
+ }
+
+ for (int i = 0; i < result_data.size(); i++) {
+ TRow row = result_data[i];
+ for (int j = 0; j < _s_tbls_columns.size(); j++) {
+ RETURN_IF_ERROR(insert_block_column(row.column_value[j], j,
_block.get(),
+ _s_tbls_columns[j].type));
+ }
+ }
+ return Status::OK();
+}
+
+Status SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block*
block, bool* eos) {
+ if (!_is_init) {
+ return Status::InternalError("Used before initialized.");
+ }
+
+ if (nullptr == block || nullptr == eos) {
+ return Status::InternalError("input pointer is nullptr.");
+ }
+
+ if (_block == nullptr) {
+ RETURN_IF_ERROR(_get_workload_schedule_policy_block_from_fe());
+ _total_rows = _block->rows();
+ }
+
+ if (_row_idx == _total_rows) {
+ *eos = true;
+ return Status::OK();
+ }
+
+ int current_batch_rows = std::min(_block_rows_limit, _total_rows -
_row_idx);
+ vectorized::MutableBlock mblock =
vectorized::MutableBlock::build_mutable_block(block);
+ mblock.add_rows(_block.get(), _row_idx, current_batch_rows);
+ _row_idx += current_batch_rows;
+
+ *eos = _row_idx == _total_rows;
+ return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h
b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h
new file mode 100644
index 00000000000..5284975fe66
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaWorkloadSchedulePolicyScanner : public SchemaScanner {
+ ENABLE_FACTORY_CREATOR(SchemaWorkloadSchedulePolicyScanner);
+
+public:
+ SchemaWorkloadSchedulePolicyScanner();
+ ~SchemaWorkloadSchedulePolicyScanner() override;
+
+ Status start(RuntimeState* state) override;
+ Status get_next_block(vectorized::Block* block, bool* eos) override;
+
+ static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+
+private:
+ Status _get_workload_schedule_policy_block_from_fe();
+
+ int _block_rows_limit = 4096;
+ int _row_idx = 0;
+ int _total_rows = 0;
+ std::unique_ptr<vectorized::Block> _block = nullptr;
+ int _rpc_timeout = 3000;
+};
+}; // namespace doris
\ No newline at end of file
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 3f3093969a0..f1c94aa4891 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -4302,10 +4302,6 @@ show_param ::=
{:
RESULT = new ShowWorkloadGroupsStmt(parser.wild, parser.where);
:}
- | KW_WORKLOAD KW_SCHEDULE KW_POLICY
- {:
- RESULT = new ShowWorkloadSchedPolicyStmt();
- :}
| KW_BACKENDS
{:
RESULT = new ShowBackendsStmt();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 2c0901c1e50..93d6e3e55a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -74,7 +74,11 @@ public enum SchemaTableType {
SCH_ACTIVE_QUERIES("ACTIVE_QUERIES", "ACTIVE_QUERIES",
TSchemaTableType.SCH_ACTIVE_QUERIES),
SCH_WORKLOAD_GROUPS("WORKLOAD_GROUPS", "WORKLOAD_GROUPS",
TSchemaTableType.SCH_WORKLOAD_GROUPS),
SCHE_USER("user", "user", TSchemaTableType.SCH_USER),
- SCH_PROCS_PRIV("procs_priv", "procs_priv",
TSchemaTableType.SCH_PROCS_PRIV);
+ SCH_PROCS_PRIV("procs_priv", "procs_priv",
TSchemaTableType.SCH_PROCS_PRIV),
+
+ SCH_WORKLOAD_SCHEDULE_POLICY("WORKLOAD_SCHEDULE_POLICY",
"WORKLOAD_SCHEDULE_POLICY",
+ TSchemaTableType.SCH_WORKLOAD_SCHEDULE_POLICY);
+
private static final String dbName = "INFORMATION_SCHEMA";
private static SelectList fullSelectLists;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java
deleted file mode 100644
index a128ee3e8f7..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.common.UserException;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.ShowResultSetMetaData;
-import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
-
-public class ShowWorkloadSchedPolicyStmt extends ShowStmt {
-
- public ShowWorkloadSchedPolicyStmt() {
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws UserException {
- super.analyze(analyzer);
- }
-
- @Override
- public String toSql() {
- return "SHOW WORKLOAD SCHEDULE POLICY";
- }
-
- @Override
- public ShowResultSetMetaData getMetaData() {
- ShowResultSetMetaData.Builder builder =
ShowResultSetMetaData.builder();
- for (String title :
WorkloadSchedPolicyMgr.WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES) {
- builder.addColumn(new Column(title,
ScalarType.createVarchar(1000)));
- }
- return builder.build();
- }
-
- @Override
- public RedirectStatus getRedirectStatus() {
- if (ConnectContext.get().getSessionVariable().getForwardToMaster()) {
- return RedirectStatus.FORWARD_NO_SYNC;
- } else {
- return RedirectStatus.NO_FORWARD;
- }
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 89d1c363c10..a8884c61a55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -499,6 +499,16 @@ public class SchemaTable extends Table {
.column("STATE", ScalarType.createVarchar(64))
.column("INFO",
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
.build()))
+ .put("workload_schedule_policy",
+ new SchemaTable(SystemIdGenerator.getNextId(),
"workload_schedule_policy", TableType.SCHEMA,
+ builder().column("ID",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("NAME",
ScalarType.createVarchar(256))
+ .column("CONDITION",
ScalarType.createStringType())
+ .column("ACTION",
ScalarType.createStringType())
+ .column("PRIORITY",
ScalarType.createType(PrimitiveType.INT))
+ .column("ENABLED",
ScalarType.createType(PrimitiveType.BOOLEAN))
+ .column("VERSION",
ScalarType.createType(PrimitiveType.INT))
+ .build()))
.build();
protected SchemaTable(long id, String name, TableType type, List<Column>
baseSchema) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 5b9215597b4..54d2f6d4ced 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -111,7 +111,6 @@ import org.apache.doris.analysis.ShowUserPropertyStmt;
import org.apache.doris.analysis.ShowVariablesStmt;
import org.apache.doris.analysis.ShowViewStmt;
import org.apache.doris.analysis.ShowWorkloadGroupsStmt;
-import org.apache.doris.analysis.ShowWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.backup.AbstractJob;
import org.apache.doris.backup.BackupJob;
@@ -371,8 +370,6 @@ public class ShowExecutor {
handleShowResources();
} else if (stmt instanceof ShowWorkloadGroupsStmt) {
handleShowWorkloadGroups();
- } else if (stmt instanceof ShowWorkloadSchedPolicyStmt) {
- handleShowWorkloadSchedPolicy();
} else if (stmt instanceof ShowExportStmt) {
handleShowExport();
} else if (stmt instanceof ShowBackendsStmt) {
@@ -2113,12 +2110,6 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showStmt.getMetaData(),
workloadGroupsInfos);
}
- private void handleShowWorkloadSchedPolicy() {
- ShowWorkloadSchedPolicyStmt showStmt = (ShowWorkloadSchedPolicyStmt)
stmt;
- List<List<String>> workloadSchedInfo =
Env.getCurrentEnv().getWorkloadSchedPolicyMgr().getShowPolicyInfo();
- resultSet = new ShowResultSet(showStmt.getMetaData(),
workloadSchedInfo);
- }
-
private void handleShowExport() throws AnalysisException {
ShowExportStmt showExportStmt = (ShowExportStmt) stmt;
Env env = Env.getCurrentEnv();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 60f3806dc9f..85d9ce94cdd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -98,6 +98,8 @@ public class MetadataGenerator {
private static final ImmutableMap<String, Integer>
ROUTINE_INFO_COLUMN_TO_INDEX;
+ private static final ImmutableMap<String, Integer>
WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX;
+
static {
ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new
ImmutableMap.Builder();
List<Column> activeQueriesColList =
SchemaTable.TABLE_MAP.get("active_queries").getFullSchema();
@@ -117,6 +119,14 @@ public class MetadataGenerator {
routineInfoBuilder.put(PlsqlManager.ROUTINE_INFO_TITLE_NAMES.get(i).toLowerCase(),
i);
}
ROUTINE_INFO_COLUMN_TO_INDEX = routineInfoBuilder.build();
+
+ ImmutableMap.Builder<String, Integer> policyBuilder = new
ImmutableMap.Builder();
+ List<Column> policyColList =
SchemaTable.TABLE_MAP.get("workload_schedule_policy").getFullSchema();
+ for (int i = 0; i < policyColList.size(); i++) {
+ policyBuilder.put(policyColList.get(i).getName().toLowerCase(), i);
+ }
+ WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX = policyBuilder.build();
+
}
public static TFetchSchemaTableDataResult
getMetadataTable(TFetchSchemaTableDataRequest request) throws TException {
@@ -156,9 +166,6 @@ public class MetadataGenerator {
case TASKS:
result = taskMetadataResult(params);
break;
- case WORKLOAD_SCHED_POLICY:
- result = workloadSchedPolicyMetadataResult(params);
- break;
default:
return errorResult("Metadata table params is not set.");
}
@@ -192,6 +199,10 @@ public class MetadataGenerator {
result = routineInfoMetadataResult(schemaTableParams);
columnIndex = ROUTINE_INFO_COLUMN_TO_INDEX;
break;
+ case WORKLOAD_SCHEDULE_POLICY:
+ result = workloadSchedPolicyMetadataResult(schemaTableParams);
+ columnIndex = WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX;
+ break;
default:
return errorResult("invalid schema table name.");
}
@@ -469,7 +480,7 @@ public class MetadataGenerator {
return result;
}
- private static TFetchSchemaTableDataResult
workloadSchedPolicyMetadataResult(TMetadataTableRequestParams params) {
+ private static TFetchSchemaTableDataResult
workloadSchedPolicyMetadataResult(TSchemaTableRequestParams params) {
if (!params.isSetCurrentUserIdent()) {
return errorResult("current user ident is not set.");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 56e769cc8b3..af37bcd10e4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -48,8 +48,6 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
return
JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case TASKS:
return
TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
- case WORKLOAD_SCHED_POLICY:
- return
WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:
throw new AnalysisException("Unknown Metadata
TableValuedFunction type");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index 64e794757d1..41ed6e14cb2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -72,8 +72,6 @@ public abstract class TableValuedFunctionIf {
return new TasksTableValuedFunction(params);
case GroupCommitTableValuedFunction.NAME:
return new GroupCommitTableValuedFunction(params);
- case WorkloadSchedPolicyTableValuedFunction.NAME:
- return new WorkloadSchedPolicyTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " +
funcName);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
deleted file mode 100644
index 0bf2fa7e5d1..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.tablefunction;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.thrift.TMetaScanRange;
-import org.apache.doris.thrift.TMetadataType;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import java.util.List;
-import java.util.Map;
-
-public class WorkloadSchedPolicyTableValuedFunction extends
MetadataTableValuedFunction {
-
- public static final String NAME = "workload_schedule_policy";
-
- private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
- new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)),
- new Column("Name", ScalarType.createStringType()),
- new Column("Condition",
ScalarType.createType(PrimitiveType.STRING)),
- new Column("Action", ScalarType.createType(PrimitiveType.STRING)),
- new Column("Priority", ScalarType.createType(PrimitiveType.INT)),
- new Column("Enabled",
ScalarType.createType(PrimitiveType.BOOLEAN)),
- new Column("Version", ScalarType.createType(PrimitiveType.INT)),
- new Column("WorkloadGroup",
ScalarType.createType(PrimitiveType.STRING)));
-
- private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
-
- static {
- ImmutableMap.Builder<String, Integer> builder = new
ImmutableMap.Builder();
- for (int i = 0; i < SCHEMA.size(); i++) {
- builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
- }
- COLUMN_TO_INDEX = builder.build();
- }
-
- public static Integer getColumnIndexFromColumnName(String columnName) {
- return COLUMN_TO_INDEX.get(columnName.toLowerCase());
- }
-
- public WorkloadSchedPolicyTableValuedFunction(Map<String, String> params) {
- if (params.size() > 0) {
- throw new org.apache.doris.nereids.exceptions.AnalysisException(
- "workload schedule policy table-valued-function does not
support any params");
- }
- }
-
- @Override
- public TMetadataType getMetadataType() {
- return TMetadataType.WORKLOAD_SCHED_POLICY;
- }
-
- @Override
- public TMetaScanRange getMetaScanRange() {
- TMetaScanRange metaScanRange = new TMetaScanRange();
- metaScanRange.setMetadataType(TMetadataType.WORKLOAD_SCHED_POLICY);
- return metaScanRange;
- }
-
- @Override
- public String getTableName() {
- return "WorkloadSchedPolicyTableValuedFunction";
- }
-
- @Override
- public List<Column> getTableColumns() throws AnalysisException {
- return SCHEMA;
- }
-}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
index 26c1f5d7664..45a46d09121 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
@@ -103,7 +103,7 @@ public class RefreshCatalogTest extends TestWithFeService {
List<String> dbNames2 = test1.getDbNames();
Assertions.assertEquals(5, dbNames2.size());
ExternalInfoSchemaDatabase infoDb = (ExternalInfoSchemaDatabase)
test1.getDb(InfoSchemaDb.DATABASE_NAME).get();
- Assertions.assertEquals(31, infoDb.getTables().size());
+ Assertions.assertEquals(32, infoDb.getTables().size());
TestExternalDatabase testDb = (TestExternalDatabase)
test1.getDb("db1").get();
Assertions.assertEquals(2, testDb.getTables().size());
ExternalMysqlDatabase mysqlDb = (ExternalMysqlDatabase)
test1.getDb(MysqlDb.DATABASE_NAME).get();
@@ -114,7 +114,7 @@ public class RefreshCatalogTest extends TestWithFeService {
CatalogMgr mgr2 = GsonUtils.GSON.fromJson(json, CatalogMgr.class);
test1 = mgr2.getCatalog("test1");
infoDb = (ExternalInfoSchemaDatabase)
test1.getDb(InfoSchemaDb.DATABASE_NAME).get();
- Assertions.assertEquals(31, infoDb.getTables().size());
+ Assertions.assertEquals(32, infoDb.getTables().size());
testDb = (TestExternalDatabase) test1.getDb("db1").get();
Assertions.assertEquals(2, testDb.getTables().size());
mysqlDb = (ExternalMysqlDatabase)
test1.getDb(MysqlDb.DATABASE_NAME).get();
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index cbb73959a0a..5bcec8ed6e2 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -130,7 +130,8 @@ enum TSchemaTableType {
SCH_ACTIVE_QUERIES,
SCH_WORKLOAD_GROUPS,
SCH_USER,
- SCH_PROCS_PRIV;
+ SCH_PROCS_PRIV,
+ SCH_WORKLOAD_SCHEDULE_POLICY;
}
enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 09cd2bf0be2..39fc7f56563 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -942,6 +942,7 @@ enum TSchemaTableName {
ACTIVE_QUERIES = 2, // db information_schema's table
WORKLOAD_GROUPS = 3, // db information_schema's table
ROUTINES_INFO = 4, // db information_schema's table
+ WORKLOAD_SCHEDULE_POLICY = 5,
}
struct TMetadataTableRequestParams {
diff --git
a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
index 01ba13f742a..b7113c06cbd 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
@@ -59,6 +59,7 @@ triggers
user_privileges
views
workload_groups
+workload_schedule_policy
-- !auto_default_t --
0
diff --git
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
index 4586d38228a..300c28b25d9 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
@@ -223,6 +223,7 @@ triggers
user_privileges
views
workload_groups
+workload_schedule_policy
-- !dt --
2023-06-17T10:00 2023-06-17T10:00:01.100 2023-06-17T10:00:02.220
2023-06-17T10:00:03.333 2023-06-17T10:00:04.444400
2023-06-17T10:00:05.555550 2023-06-17T10:00:06.666666
diff --git
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
index 09714901c14..7a6399f87a2 100644
---
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
+++
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
@@ -191,6 +191,7 @@ triggers
user_privileges
views
workload_groups
+workload_schedule_policy
-- !test_insert1 --
doris1 18
diff --git
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out
index 35c805ab896..6dee70583c1 100644
---
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out
+++
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out
@@ -233,6 +233,7 @@ triggers
user_privileges
views
workload_groups
+workload_schedule_policy
-- !dt --
2023-06-17T10:00 2023-06-17T10:00:01 2023-06-17T10:00:02
2023-06-17T10:00:03 2023-06-17T10:00:04 2023-06-17T10:00:05
2023-06-17T10:00:06
diff --git
a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
index d32fff321e4..65b4c1901b6 100644
--- a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
+++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
@@ -2,7 +2,7 @@
-- !select_policy_tvf --
be_policy query_time > 10 cancel_query 10 false 0
fe_policy username = root set_session_variable "workload_group=normal"
10 false 0
-set_action_policy username = root set_session_variable
"workload_group=normal" 0 true 0
+set_action_policy username = root set_session_variable
"workload_group=normal" 0 false 0
test_cancel_policy query_time > 10 cancel_query 0 false 0
-- !select_policy_tvf_after_drop --
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 8acfc8cb4ac..875eeb668e2 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -301,21 +301,21 @@ suite("test_crud_wlg") {
sql "alter workload group test_group properties ( 'max_queue_size'='0' );"
Thread.sleep(10000)
test {
- sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from
${table_name};"
+ sql "select /*+SET_VAR(workload_group=test_group)*/ * from
${table_name};"
exception "query waiting queue is full"
}
// test insert into select will go to queue
test {
- sql "insert into ${table_name2} select
/*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from ${table_name};"
+ sql "insert into ${table_name2} select
/*+SET_VAR(workload_group=test_group)*/ * from ${table_name};"
exception "query waiting queue is full"
}
// test create table as select will go to queue
test {
- sql "create table ${table_name3} PROPERTIES('replication_num' = '1')
as select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from
${table_name};"
+ sql "create table ${table_name3} PROPERTIES('replication_num' = '1')
as select /*+SET_VAR(workload_group=test_group)*/ * from ${table_name};"
exception "query waiting queue is full"
}
diff --git
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
index 8531b3cf34a..d8ab2611094 100644
---
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
+++
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -32,7 +32,7 @@ suite("test_workload_sched_policy") {
// 2 create set policy
sql "create workload schedule policy set_action_policy " +
"conditions(username='root') " +
- "actions(set_session_variable 'workload_group=normal');"
+ "actions(set_session_variable 'workload_group=normal')
properties('enabled'='false');"
// 3 create policy run in fe
sql "create workload schedule policy fe_policy " +
@@ -52,7 +52,7 @@ suite("test_workload_sched_policy") {
"'priority'='10' " +
");"
- qt_select_policy_tvf "select
name,condition,action,priority,enabled,version from workload_schedule_policy()
where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy')
order by name;"
+ qt_select_policy_tvf "select
name,condition,action,priority,enabled,version from
information_schema.workload_schedule_policy where name
in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by
name;"
// test_alter
sql "alter workload schedule policy fe_policy properties('priority'='2',
'enabled'='false');"
@@ -112,7 +112,7 @@ suite("test_workload_sched_policy") {
sql "drop workload schedule policy fe_policy;"
sql "drop workload schedule policy be_policy;"
- qt_select_policy_tvf_after_drop "select
name,condition,action,priority,enabled,version from workload_schedule_policy()
where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy')
order by name;"
+ qt_select_policy_tvf_after_drop "select
name,condition,action,priority,enabled,version from
information_schema.workload_schedule_policy where name
in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by
name;"
// test workload schedule policy
sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' =
'500');"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]