This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 012e66729a5 [improvement](executor) Add tvf and regression test for
Workload Scheduler (#28733)
012e66729a5 is described below
commit 012e66729a5cd465c6c4b2cb34ed00e5c86aebf0
Author: wangbo <[email protected]>
AuthorDate: Fri Dec 22 12:09:51 2023 +0800
[improvement](executor) Add tvf and regression test for Workload Scheduler
(#28733)
1 Add select workload schedule policy tvf
2 Add reg test
---
be/src/vec/exec/scan/vmeta_scanner.cpp | 20 +++
be/src/vec/exec/scan/vmeta_scanner.h | 2 +
.../analysis/CreateWorkloadSchedPolicyStmt.java | 34 ++++
.../resource/workloadgroup/WorkloadGroupMgr.java | 13 ++
.../workloadschedpolicy/WorkloadActionMeta.java | 16 ++
.../WorkloadConditionCompareUtils.java | 18 ++
.../workloadschedpolicy/WorkloadConditionMeta.java | 2 +-
.../workloadschedpolicy/WorkloadQueryInfo.java | 2 +-
.../workloadschedpolicy/WorkloadSchedPolicy.java | 11 +-
.../WorkloadSchedPolicyMgr.java | 67 +++----
.../doris/tablefunction/MetadataGenerator.java | 30 ++++
.../tablefunction/MetadataTableValuedFunction.java | 2 +
.../doris/tablefunction/TableValuedFunctionIf.java | 2 +
.../WorkloadSchedPolicyTableValuedFunction.java | 88 +++++++++
.../apache/doris/resource/WorkloadSchedTest.java | 197 +++++++++++++++++++++
gensrc/thrift/Types.thrift | 1 +
.../test_workload_sched_policy.out | 9 +
.../test_workload_sched_policy.groovy | 168 ++++++++++++++++++
18 files changed, 637 insertions(+), 45 deletions(-)
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 1f4dcd8593d..22545fa4dce 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -238,6 +238,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange&
meta_scan_range) {
case TMetadataType::WORKLOAD_GROUPS:
RETURN_IF_ERROR(_build_workload_groups_metadata_request(meta_scan_range,
&request));
break;
+ case TMetadataType::WORKLOAD_SCHED_POLICY:
+
RETURN_IF_ERROR(_build_workload_sched_policy_metadata_request(meta_scan_range,
&request));
+ break;
case TMetadataType::CATALOGS:
RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range,
&request));
break;
@@ -379,6 +382,23 @@ Status
VMetaScanner::_build_workload_groups_metadata_request(
return Status::OK();
}
+Status VMetaScanner::_build_workload_sched_policy_metadata_request(
+ const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest*
request) {
+ VLOG_CRITICAL <<
"VMetaScanner::_build_workload_sched_policy_metadata_request";
+
+ // create request
+ request->__set_cluster_name("");
+ request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
+
+ // create TMetadataTableRequestParams
+ TMetadataTableRequestParams metadata_table_params;
+
metadata_table_params.__set_metadata_type(TMetadataType::WORKLOAD_SCHED_POLICY);
+ metadata_table_params.__set_current_user_ident(_user_identity);
+
+ request->__set_metada_table_params(metadata_table_params);
+ return Status::OK();
+}
+
Status VMetaScanner::_build_catalogs_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_catalogs_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h
b/be/src/vec/exec/scan/vmeta_scanner.h
index 7c4a1f2b2de..59bd55dc2d8 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -79,6 +79,8 @@ private:
TFetchSchemaTableDataRequest* request);
Status _build_workload_groups_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest* request);
+ Status _build_workload_sched_policy_metadata_request(const TMetaScanRange&
meta_scan_range,
+
TFetchSchemaTableDataRequest* request);
Status _build_catalogs_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest*
request);
Status _build_materialized_views_metadata_request(const TMetaScanRange&
meta_scan_range,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java
index ee82b57822f..001068476d4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java
@@ -88,4 +88,38 @@ public class CreateWorkloadSchedPolicyStmt extends DdlStmt {
public Map<String, String> getProperties() {
return properties;
}
+
+ @Override
+ public String toSql() {
+ String str = "";
+ str = str + "CREAYE ";
+ str = str + "WORKLOAD SCHEDULE POLICY " + policyName + " ";
+
+ str = str + " CONDITIONS( ";
+ if (conditions != null) {
+ for (WorkloadConditionMeta wcm : conditions) {
+ str += wcm.toString() + ",";
+ }
+ }
+ str = str.substring(0, str.length() - 1);
+ str = str + ")";
+
+ str = str + " ACTIONS( ";
+ if (actions != null) {
+ for (WorkloadActionMeta wam : actions) {
+ str = str + wam.toString() + ",";
+ }
+ }
+ str = str.substring(0, str.length() - 1);
+ str = str + ")";
+
+ str = str + " PROPERTIES(";
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ str = str + "\"" + entry.getKey() + "\"" + "=" + "\"" +
entry.getValue() + "\",";
+ }
+ str = str.substring(0, str.length() - 1);
+ str = str + ")";
+
+ return str;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 2285db603b0..c8b49a5ebaa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -445,6 +445,19 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
}
}
+ public String getWorkloadGroupNameById(Long id) {
+ readLock();
+ try {
+ WorkloadGroup wg = idToWorkloadGroup.get(id);
+ if (wg == null) {
+ return null;
+ }
+ return wg.getName();
+ } finally {
+ readUnlock();
+ }
+ }
+
// for ut
public Map<String, WorkloadGroup> getNameToWorkloadGroup() {
return nameToWorkloadGroup;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
index 776c0bccfdc..57f6ba37993 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
@@ -17,9 +17,11 @@
package org.apache.doris.resource.workloadschedpolicy;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
public class WorkloadActionMeta {
@@ -44,4 +46,18 @@ public class WorkloadActionMeta {
}
throw new UserException("invalid action type " + strType);
}
+
+ public String toString() {
+ if (StringUtils.isEmpty(actionArgs)) {
+ return action.toString();
+ } else {
+ String retActionArgs = actionArgs;
+ if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(action)) {
+ retActionArgs = Env.getCurrentEnv().getWorkloadGroupMgr()
+ .getWorkloadGroupNameById(Long.valueOf(actionArgs));
+ }
+ retActionArgs = retActionArgs == null ? "-1" : retActionArgs;
+ return action + " \"" + retActionArgs + "\"";
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java
index 8aa53a6f340..ac4c51acdff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java
@@ -39,6 +39,24 @@ public class WorkloadConditionCompareUtils {
}
}
+ // used for select tvf
+ static String getOperatorStr(WorkloadConditionOperator op) {
+ switch (op) {
+ case EQUAL:
+ return "=";
+ case GREATER:
+ return ">";
+ case GREATER_EQUAL:
+ return ">=";
+ case LESS:
+ return "<";
+ case LESS_EQUAl:
+ return "<=";
+ default:
+ throw new RuntimeException("unexpected compare operator " +
op);
+ }
+ }
+
static boolean compareInteger(WorkloadConditionOperator operator, long
firstArgs, long secondArgs) {
switch (operator) {
case EQUAL:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
index c6bfb526b9b..d5d2f922f3f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
@@ -49,6 +49,6 @@ public class WorkloadConditionMeta {
}
public String toString() {
- return metricName + " " + op + " " + value;
+ return metricName + " " +
WorkloadConditionCompareUtils.getOperatorStr(op) + " " + value;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java
index 27d821c32c0..b6a98633c58 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java
@@ -26,5 +26,5 @@ public class WorkloadQueryInfo {
String queryId = null;
TUniqueId tUniqueId = null;
ConnectContext context = null;
- Map<WorkloadMetricType, String> metricMap;
+ public Map<WorkloadMetricType, String> metricMap;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 7186d4409a5..827c2367133 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -62,6 +62,15 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
private List<WorkloadCondition> workloadConditionList;
private List<WorkloadAction> workloadActionList;
+ // for ut
+ public WorkloadSchedPolicy() {
+ }
+
+ // for ut
+ public void setWorkloadConditionList(List<WorkloadCondition>
workloadConditionList) {
+ this.workloadConditionList = workloadConditionList;
+ }
+
public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition>
workloadConditionList,
List<WorkloadAction> workloadActionList, Map<String, String>
properties) throws UserException {
this.id = id;
@@ -77,7 +86,7 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
// return false,
// 1 metric not match
// 2 condition value not match query info's value
- boolean isMatch(WorkloadQueryInfo queryInfo) {
+ public boolean isMatch(WorkloadQueryInfo queryInfo) {
for (WorkloadCondition condition : workloadConditionList) {
WorkloadMetricType metricType = condition.getMetricType();
String value = queryInfo.metricMap.get(metricType);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 9e2e4cd91af..346e34796c7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -34,12 +34,12 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.thrift.TUserIdentity;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
-import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -69,7 +69,7 @@ public class WorkloadSchedPolicyMgr implements Writable,
GsonPostProcessable {
public static final ImmutableList<String>
WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
= new ImmutableList.Builder<String>()
- .add("Id").add("Name").add("ItemName").add("ItemValue")
+
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
.build();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -349,7 +349,8 @@ public class WorkloadSchedPolicyMgr implements Writable,
GsonPostProcessable {
throw new UserException("policy's priority can only
between 0 ~ 100");
}
} catch (NumberFormatException e) {
- throw new UserException("policy's priority must be a number,
input value=" + priorityStr);
+ throw new UserException(
+ "invalid priority property value, it must be a number,
input value=" + priorityStr);
}
}
}
@@ -448,6 +449,11 @@ public class WorkloadSchedPolicyMgr implements Writable,
GsonPostProcessable {
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}
+ public List<List<String>> getWorkloadSchedPolicyTvfInfo(TUserIdentity
tcurrentUserIdentity) {
+ UserIdentity currentUserIdentity =
UserIdentity.fromThrift(tcurrentUserIdentity);
+ return policyProcNode.fetchResult(currentUserIdentity).getRows();
+ }
+
public class PolicyProcNode {
public ProcResult fetchResult(UserIdentity currentUserIdentity) {
BaseProcResult result = new BaseProcResult();
@@ -460,54 +466,31 @@ public class WorkloadSchedPolicyMgr implements Writable,
GsonPostProcessable {
continue;
}
- String pId = String.valueOf(policy.getId());
+ List<String> row = new ArrayList<>();
String pName = policy.getName();
+ row.add(String.valueOf(policy.getId()));
+ row.add(pName);
List<WorkloadConditionMeta> conditionList =
policy.getConditionMetaList();
+ StringBuilder cmStr = new StringBuilder();
for (WorkloadConditionMeta cm : conditionList) {
- List<String> condRow = new ArrayList<>();
- condRow.add(pId);
- condRow.add(pName);
- condRow.add("condition");
- condRow.add(cm.toString());
- result.addRow(condRow);
+ cmStr.append(cm.toString()).append(";");
}
+ String retStr = cmStr.toString().toLowerCase();
+ row.add(retStr.substring(0, retStr.length() - 1));
List<WorkloadActionMeta> actionList =
policy.getActionMetaList();
- for (WorkloadActionMeta workloadActionMeta : actionList) {
- List<String> actionRow = new ArrayList<>();
- actionRow.add(pId);
- actionRow.add(pName);
- actionRow.add("action");
- if
(StringUtils.isEmpty(workloadActionMeta.actionArgs)) {
-
actionRow.add(workloadActionMeta.action.toString());
- } else {
- actionRow.add(workloadActionMeta.action + " " +
workloadActionMeta.actionArgs);
- }
- result.addRow(actionRow);
+ StringBuilder actionStr = new StringBuilder();
+ for (WorkloadActionMeta am : actionList) {
+ actionStr.append(am.toString()).append(";");
}
+ String retStr2 = actionStr.toString().toLowerCase();
+ row.add(retStr2.substring(0, retStr2.length() - 1));
- List<String> prioRow = new ArrayList<>();
- prioRow.add(pId);
- prioRow.add(pName);
- prioRow.add("priority");
- prioRow.add(String.valueOf(policy.getPriority()));
- result.addRow(prioRow);
-
- List<String> enabledRow = new ArrayList<>();
- enabledRow.add(pId);
- enabledRow.add(pName);
- enabledRow.add("enabled");
- enabledRow.add(String.valueOf(policy.isEnabled()));
- result.addRow(enabledRow);
-
-
- List<String> versionRow = new ArrayList<>();
- versionRow.add(pId);
- versionRow.add(pName);
- versionRow.add("version");
- versionRow.add(String.valueOf(policy.getVersion()));
- result.addRow(versionRow);
+ row.add(String.valueOf(policy.getPriority()));
+ row.add(String.valueOf(policy.isEnabled()));
+ row.add(String.valueOf(policy.getVersion()));
+ result.addRow(row);
}
} finally {
readUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index e8620b105b6..9c773b37dca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -116,6 +116,9 @@ public class MetadataGenerator {
case QUERIES:
result = queriesMetadataResult(params, request);
break;
+ case WORKLOAD_SCHED_POLICY:
+ result = workloadSchedPolicyMetadataResult(params);
+ break;
default:
return errorResult("Metadata table params is not set.");
}
@@ -383,6 +386,33 @@ public class MetadataGenerator {
return result;
}
+ private static TFetchSchemaTableDataResult
workloadSchedPolicyMetadataResult(TMetadataTableRequestParams params) {
+ if (!params.isSetCurrentUserIdent()) {
+ return errorResult("current user ident is not set.");
+ }
+
+ TUserIdentity tcurrentUserIdentity = params.getCurrentUserIdent();
+ List<List<String>> workloadPolicyList =
Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
+ .getWorkloadSchedPolicyTvfInfo(tcurrentUserIdentity);
+ TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+ List<TRow> dataBatch = Lists.newArrayList();
+ for (List<String> policyRow : workloadPolicyList) {
+ TRow trow = new TRow();
+ trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id
+ trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1)));
// name
+ trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2)));
// condition
+ trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3)));
// action
+ trow.addToColumnValue(new
TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
+ trow.addToColumnValue(new
TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
+ trow.addToColumnValue(new
TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
+ dataBatch.add(trow);
+ }
+
+ result.setDataBatch(dataBatch);
+ result.setStatus(new TStatus(TStatusCode.OK));
+ return result;
+ }
+
private static TFetchSchemaTableDataResult
queriesMetadataResult(TMetadataTableRequestParams params,
TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index d2c3278314e..53a0b7ee5b8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -51,6 +51,8 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
return
TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case QUERIES:
return
QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
+ case WORKLOAD_SCHED_POLICY:
+ return
WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:
throw new AnalysisException("Unknown Metadata
TableValuedFunction type");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index b14a09769cb..c9547c91bd2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -76,6 +76,8 @@ public abstract class TableValuedFunctionIf {
return new GroupCommitTableValuedFunction(params);
case QueriesTableValuedFunction.NAME:
return new QueriesTableValuedFunction(params);
+ case WorkloadSchedPolicyTableValuedFunction.NAME:
+ return new WorkloadSchedPolicyTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " +
funcName);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
new file mode 100644
index 00000000000..b4795b21058
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+public class WorkloadSchedPolicyTableValuedFunction extends
MetadataTableValuedFunction {
+
+ public static final String NAME = "workload_schedule_policy";
+
+ private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
+ new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)),
+ new Column("Name", ScalarType.createStringType()),
+ new Column("Condition",
ScalarType.createType(PrimitiveType.STRING)),
+ new Column("Action", ScalarType.createType(PrimitiveType.STRING)),
+ new Column("Priority", ScalarType.createType(PrimitiveType.INT)),
+ new Column("Enabled",
ScalarType.createType(PrimitiveType.BOOLEAN)),
+ new Column("Version", ScalarType.createType(PrimitiveType.INT)));
+
+ private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
+
+ static {
+ ImmutableMap.Builder<String, Integer> builder = new
ImmutableMap.Builder();
+ for (int i = 0; i < SCHEMA.size(); i++) {
+ builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
+ }
+ COLUMN_TO_INDEX = builder.build();
+ }
+
+ public static Integer getColumnIndexFromColumnName(String columnName) {
+ return COLUMN_TO_INDEX.get(columnName.toLowerCase());
+ }
+
+ public WorkloadSchedPolicyTableValuedFunction(Map<String, String> params) {
+ if (params.size() > 0) {
+ throw new org.apache.doris.nereids.exceptions.AnalysisException(
+ "workload schedule policy table-valued-function does not
support any params");
+ }
+ }
+
+ @Override
+ public TMetadataType getMetadataType() {
+ return TMetadataType.WORKLOAD_SCHED_POLICY;
+ }
+
+ @Override
+ public TMetaScanRange getMetaScanRange() {
+ TMetaScanRange metaScanRange = new TMetaScanRange();
+ metaScanRange.setMetadataType(TMetadataType.WORKLOAD_SCHED_POLICY);
+ return metaScanRange;
+ }
+
+ @Override
+ public String getTableName() {
+ return "WorkloadSchedPolicyTableValuedFunction";
+ }
+
+ @Override
+ public List<Column> getTableColumns() throws AnalysisException {
+ return SCHEMA;
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java
new file mode 100644
index 00000000000..11c00eca234
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.resource.workloadschedpolicy.WorkloadCondition;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionOperator;
+import
org.apache.doris.resource.workloadschedpolicy.WorkloadConditionQueryTime;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionUsername;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadMetricType;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadQueryInfo;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class WorkloadSchedTest {
+
+ @Test
+ public void testPolicyCondition() {
+ // 1 test compare operator
+ // 1.1 >
+ {
+ List<WorkloadCondition> operatorList = new ArrayList<>();
+ WorkloadCondition intCondition = new
WorkloadConditionQueryTime(WorkloadConditionOperator.GREATER, 100);
+ operatorList.add(intCondition);
+
+ WorkloadSchedPolicy workloadSchedPolicy1 = new
WorkloadSchedPolicy();
+ workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+ WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+ queryInfo.metricMap = new HashMap<>();
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "101");
+
+ // match
+ Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+ // not match
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+ Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+ }
+
+ // 1.2 >=
+ {
+ List<WorkloadCondition> operatorList = new ArrayList<>();
+ WorkloadCondition intCondition = new
WorkloadConditionQueryTime(WorkloadConditionOperator.GREATER_EQUAL, 100);
+ operatorList.add(intCondition);
+
+ WorkloadSchedPolicy workloadSchedPolicy1 = new
WorkloadSchedPolicy();
+ workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+ WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+ queryInfo.metricMap = new HashMap<>();
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+
+ // match
+ Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+ // not match
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "10");
+ Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+ }
+
+ // 1.3 =
+ {
+ List<WorkloadCondition> operatorList = new ArrayList<>();
+ WorkloadCondition intCondition = new
WorkloadConditionQueryTime(WorkloadConditionOperator.EQUAL, 100);
+ operatorList.add(intCondition);
+
+ WorkloadSchedPolicy workloadSchedPolicy1 = new
WorkloadSchedPolicy();
+ workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+ WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+ queryInfo.metricMap = new HashMap<>();
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+
+ // match
+ Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+ // not match
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "10");
+ Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+ }
+
+ // 1.4 <
+ {
+ List<WorkloadCondition> operatorList = new ArrayList<>();
+ WorkloadCondition intCondition = new
WorkloadConditionQueryTime(WorkloadConditionOperator.LESS, 100);
+ operatorList.add(intCondition);
+
+ WorkloadSchedPolicy workloadSchedPolicy1 = new
WorkloadSchedPolicy();
+ workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+ WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+ queryInfo.metricMap = new HashMap<>();
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "99");
+
+ // match
+ Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+ // not match
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+ Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+ }
+
+ // 1.5 <=
+ {
+ List<WorkloadCondition> operatorList = new ArrayList<>();
+ WorkloadCondition intCondition = new
WorkloadConditionQueryTime(WorkloadConditionOperator.LESS_EQUAl, 100);
+ operatorList.add(intCondition);
+
+ WorkloadSchedPolicy workloadSchedPolicy1 = new
WorkloadSchedPolicy();
+ workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+ WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+ queryInfo.metricMap = new HashMap<>();
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+
+ // match
+ Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+ // not match
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "101");
+ Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+ }
+
+ // 2 string compare
+ {
+ List<WorkloadCondition> operatorList = new ArrayList<>();
+ WorkloadCondition strCondition = new
WorkloadConditionUsername(WorkloadConditionOperator.EQUAL, "root");
+ operatorList.add(strCondition);
+
+ WorkloadSchedPolicy workloadSchedPolicy1 = new
WorkloadSchedPolicy();
+ workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+ WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+ queryInfo.metricMap = new HashMap<>();
+ queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "root");
+
+ // match
+ Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+ // not match
+ queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "abc");
+ Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+ }
+
+ // 3 mixed condition
+ {
+ List<WorkloadCondition> operatorList = new ArrayList<>();
+ WorkloadCondition strCondition = new
WorkloadConditionUsername(WorkloadConditionOperator.EQUAL, "root");
+ operatorList.add(strCondition);
+
+ WorkloadCondition intCondition = new
WorkloadConditionQueryTime(WorkloadConditionOperator.EQUAL, 100);
+ operatorList.add(intCondition);
+
+ WorkloadSchedPolicy workloadSchedPolicy1 = new
WorkloadSchedPolicy();
+ workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+ WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+ queryInfo.metricMap = new HashMap<>();
+ queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "root");
+ queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+
+ // match
+ Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+ // not match 1
+ queryInfo.metricMap.remove(WorkloadMetricType.USERNAME);
+ Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+
+ // not match 2
+ queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "abc");
+ Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+ }
+
+ }
+
+}
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 4f101f1177e..2d0f380dbce 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -702,6 +702,7 @@ enum TMetadataType {
JOBS,
TASKS,
QUERIES,
+ WORKLOAD_SCHED_POLICY
}
enum TIcebergQueryType {
diff --git
a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
new file mode 100644
index 00000000000..4e8482384c4
--- /dev/null
+++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_policy_tvf --
+full_policy_policy query_time > 10;username = root set_session_variable
"workload_group=normal" 10 false 0
+move_action_policy username = root move_query_to_group "normal" 0
true 0
+set_action_policy query_time > 10;username = root set_session_variable
"workload_group=normal" 0 true 0
+test_cancel_policy query_time > 10 cancel_query 0 false 0
+
+-- !select_policy_tvf_after_drop --
+
diff --git
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
new file mode 100644
index 00000000000..be4d7411e3d
--- /dev/null
+++
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_workload_sched_policy") {
+
+ sql "set experimental_enable_nereids_planner = false;"
+
+ sql "drop workload schedule policy if exists full_policy_policy;"
+ sql "drop workload schedule policy if exists set_action_policy;"
+ sql "drop workload schedule policy if exists move_action_policy;"
+ sql "drop workload schedule policy if exists test_cancel_policy;"
+ sql "drop workload schedule policy if exists test_set_var_policy;"
+ sql "drop workload schedule policy if exists test_set_var_policy2;"
+
+ // 1 create cancel policy
+ sql "create workload schedule policy test_cancel_policy " +
+ " conditions(query_time > 10) " +
+ " actions(cancel_query) properties('enabled'='false'); "
+
+ // 2 create cancel policy
+ sql "create workload schedule policy move_action_policy " +
+ "conditions(username='root') " +
+ "actions(move_query_to_group 'normal');"
+
+ // 3 create set policy
+ sql "create workload schedule policy set_action_policy " +
+ "conditions(query_time > 10, username='root') " +
+ "actions(set_session_variable 'workload_group=normal');"
+
+ // 4 create policy with property
+ sql "create workload schedule policy full_policy_policy " +
+ "conditions(query_time > 10, username='root') " +
+ "actions(set_session_variable 'workload_group=normal') " +
+ "properties( " +
+ "'enabled' = 'false', " +
+ "'priority'='10' " +
+ ");"
+
+ qt_select_policy_tvf "select
name,condition,action,priority,enabled,version from workload_schedule_policy()
order by name;"
+
+ // test_alter
+ sql "alter workload schedule policy full_policy_policy
properties('priority'='2', 'enabled'='false');"
+
+ // create failed check
+ try {
+ sql "create workload schedule policy failed_policy " +
+ "conditions(abc > 123) actions(cancel_query);"
+ } catch(Exception e) {
+ assertTrue(e.getMessage().contains("invalid metric name"))
+ }
+
+ try {
+ sql "create workload schedule policy failed_policy " +
+ "conditions(query_time > 123) actions(abc);"
+ } catch(Exception e) {
+ assertTrue(e.getMessage().contains("invalid action type"))
+ }
+
+ try {
+ sql "alter workload schedule policy full_policy_policy
properties('priority'='abc');"
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("invalid priority property value"))
+ }
+
+ try {
+ sql "alter workload schedule policy full_policy_policy
properties('enabled'='abc');"
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("invalid enabled property value"))
+ }
+
+ try {
+ sql "alter workload schedule policy full_policy_policy
properties('priority'='10000');"
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("priority can only between"))
+ }
+
+ try {
+ sql "create workload schedule policy conflict_policy " +
+ "conditions (username = 'root')" +
+ "actions(cancel_query, move_query_to_group 'normal');"
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("can not exist in one policy at
same time"))
+ }
+
+ try {
+ sql "create workload schedule policy conflict_policy " +
+ "conditions (username = 'root') " +
+ "actions(cancel_query, cancel_query);"
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("duplicate action in one policy"))
+ }
+
+ try {
+ sql "create workload schedule policy conflict_policy " +
+ "conditions (username = 'root') " +
+ "actions(set_session_variable 'workload_group=normal',
set_session_variable 'workload_group=abc');"
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("duplicate set_session_variable
action args one policy"))
+ }
+
+ // drop
+ sql "drop workload schedule policy full_policy_policy;"
+ sql "drop workload schedule policy set_action_policy;"
+ sql "drop workload schedule policy move_action_policy;"
+ sql "drop workload schedule policy test_cancel_policy;"
+
+ qt_select_policy_tvf_after_drop "select
name,condition,action,priority,enabled,version from workload_schedule_policy()
order by name;"
+
+ // test workload schedule policy
+ sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' =
'500');"
+ sql """drop user if exists test_workload_sched_user"""
+ sql """create user test_workload_sched_user identified by '12345'"""
+ sql """grant ADMIN_PRIV on *.*.* to test_workload_sched_user"""
+
+ // 1 create test_set_var_policy
+ sql "create workload schedule policy test_set_var_policy
conditions(username='test_workload_sched_user')" +
+ "actions(set_session_variable 'parallel_pipeline_task_num=33');"
+ def result1 = connect(user = 'test_workload_sched_user', password =
'12345', url = context.config.jdbcUrl) {
+ logger.info("begin sleep 15s to wait")
+ Thread.sleep(15000)
+ sql "show variables like '%parallel_pipeline_task_num%';"
+ }
+ assertEquals("parallel_pipeline_task_num", result1[0][0])
+ assertEquals("33", result1[0][1])
+
+ // 2 create test_set_var_policy2 with higher priority
+ sql "create workload schedule policy test_set_var_policy2
conditions(username='test_workload_sched_user') " +
+ "actions(set_session_variable 'parallel_pipeline_task_num=22')
properties('priority'='10');"
+ def result2 = connect(user = 'test_workload_sched_user', password =
'12345', url = context.config.jdbcUrl) {
+ Thread.sleep(3000)
+ sql "show variables like '%parallel_pipeline_task_num%';"
+ }
+ assertEquals("parallel_pipeline_task_num", result2[0][0])
+ assertEquals("22", result2[0][1])
+
+ // 3 disable test_set_var_policy2
+ sql "alter workload schedule policy test_set_var_policy2
properties('enabled'='false');"
+ def result3 = connect(user = 'test_workload_sched_user', password =
'12345', url = context.config.jdbcUrl) {
+ Thread.sleep(3000)
+ sql "show variables like '%parallel_pipeline_task_num%';"
+ }
+ assertEquals("parallel_pipeline_task_num", result3[0][0])
+ assertEquals("33", result3[0][1])
+
+ sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' =
'10000');"
+
+ sql "drop workload schedule policy if exists full_policy_policy;"
+ sql "drop workload schedule policy if exists set_action_policy;"
+ sql "drop workload schedule policy if exists move_action_policy;"
+ sql "drop workload schedule policy if exists test_cancel_policy;"
+ sql "drop workload schedule policy if exists test_set_var_policy;"
+ sql "drop workload schedule policy if exists test_set_var_policy2;"
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]