This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0af0a47324f Fix alter policy failed (#33910)
0af0a47324f is described below
commit 0af0a47324f42a24ace24e648e2b58501bd08e67
Author: wangbo <[email protected]>
AuthorDate: Mon Apr 22 12:52:23 2024 +0800
Fix alter policy failed (#33910)
---
.../schema_workload_sched_policy_scanner.cpp | 1 +
.../java/org/apache/doris/catalog/SchemaTable.java | 1 +
.../workloadschedpolicy/WorkloadSchedPolicy.java | 8 ++++++--
.../WorkloadSchedPolicyMgr.java | 2 +-
.../test_workload_sched_policy.out | 15 ++++++++++++++
.../test_workload_sched_policy.groovy | 24 +++++++++++++++++++++-
6 files changed, 47 insertions(+), 4 deletions(-)
diff --git
a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
index 725544ad5a5..3dae2714f02 100644
--- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
@@ -34,6 +34,7 @@ std::vector<SchemaScanner::ColumnDesc>
SchemaWorkloadSchedulePolicyScanner::_s_t
{"PRIORITY", TYPE_INT, sizeof(int32_t), true},
{"ENABLED", TYPE_BOOLEAN, sizeof(bool), true},
{"VERSION", TYPE_INT, sizeof(int32_t), true},
+ {"WORKLOAD_GROUP", TYPE_STRING, sizeof(StringRef), true},
};
SchemaWorkloadSchedulePolicyScanner::SchemaWorkloadSchedulePolicyScanner()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index a8884c61a55..d0b828fdba2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -508,6 +508,7 @@ public class SchemaTable extends Table {
.column("PRIORITY",
ScalarType.createType(PrimitiveType.INT))
.column("ENABLED",
ScalarType.createType(PrimitiveType.BOOLEAN))
.column("VERSION",
ScalarType.createType(PrimitiveType.INT))
+ .column("WORKLOAD_GROUP",
ScalarType.createStringType())
.build()))
.build();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 2f8706c574b..55759e90972 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -172,7 +172,7 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
return retType;
}
- public void updateProperty(Map<String, String> property, List<Long>
wgIdList) {
+ public void updatePropertyIfNotNull(Map<String, String> property,
List<Long> wgIdList) {
String enabledStr = property.get(ENABLED);
if (enabledStr != null) {
this.enabled = Boolean.parseBoolean(enabledStr);
@@ -183,7 +183,11 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
this.priority = Integer.parseInt(priorityStr);
}
- if (wgIdList.size() > 0) {
+ String workloadGroupIdStr = property.get(WORKLOAD_GROUP);
+ // workloadGroupIdStr != null means user set workload group property,
+ // then we should overwrite policy's workloadGroupIdList
+ // if workloadGroupIdStr.length == 0, it means the policy should match
all query.
+ if (workloadGroupIdStr != null) {
this.workloadGroupIdList = wgIdList;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 9e6eb33ffbf..ee74d4a506f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -435,7 +435,7 @@ public class WorkloadSchedPolicyMgr implements Writable,
GsonPostProcessable {
Map<String, String> properties = alterStmt.getProperties();
List<Long> wgIdList = new ArrayList<>();
checkProperties(properties, wgIdList);
- policy.updateProperty(properties, wgIdList);
+ policy.updatePropertyIfNotNull(properties, wgIdList);
policy.incrementVersion();
Env.getCurrentEnv().getEditLog().logAlterWorkloadSchedPolicy(policy);
} finally {
diff --git
a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
index 65b4c1901b6..3152367e9a1 100644
--- a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
+++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
@@ -7,3 +7,18 @@ test_cancel_policy query_time > 10 cancel_query 0
false 0
-- !select_policy_tvf_after_drop --
+-- !select_alter_1 --
+test_alter_policy username = test_alter_policy_user
set_session_variable "parallel_pipeline_task_num=0" 0 true 0
normal
+
+-- !select_alter_2 --
+test_alter_policy username = test_alter_policy_user
set_session_variable "parallel_pipeline_task_num=0" 0 true 1
+
+-- !select_alter_3 --
+test_alter_policy username = test_alter_policy_user
set_session_variable "parallel_pipeline_task_num=0" 0 false 2
+
+-- !select_alter_4 --
+test_alter_policy username = test_alter_policy_user
set_session_variable "parallel_pipeline_task_num=0" 9 false 3
+
+-- !select_alter_5 --
+test_alter_policy username = test_alter_policy_user
set_session_variable "parallel_pipeline_task_num=0" 9 false 4
normal
+
diff --git
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
index d8ab2611094..776209fa11e 100644
---
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
+++
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -149,7 +149,7 @@ suite("test_workload_sched_policy") {
}
assertEquals("parallel_pipeline_task_num", result3[0][0])
assertEquals("33", result3[0][1])
-
+
sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' =
'10000');"
sql "drop workload schedule policy if exists test_set_var_policy;"
@@ -180,6 +180,28 @@ suite("test_workload_sched_policy") {
exception "unknown workload group"
}
+ // test alter policy property
+ sql "drop user if exists test_alter_policy_user"
+ sql "CREATE USER 'test_alter_policy_user'@'%' IDENTIFIED BY '12345';"
+ sql "drop workload schedule policy if exists test_alter_policy;"
+ sql "create workload schedule policy test_alter_policy
conditions(username='test_alter_policy_user') actions(set_session_variable
'parallel_pipeline_task_num=0') properties('workload_group'='normal');"
+ qt_select_alter_1 "select
name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from
information_schema.workload_schedule_policy where name='test_alter_policy'"
+
+ sql "alter workload schedule policy test_alter_policy
properties('workload_group'='');"
+ qt_select_alter_2 "select
name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from
information_schema.workload_schedule_policy where name='test_alter_policy'"
+
+ sql "alter workload schedule policy test_alter_policy
properties('enabled'='false');"
+ qt_select_alter_3 "select
name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from
information_schema.workload_schedule_policy where name='test_alter_policy'"
+
+ sql "alter workload schedule policy test_alter_policy
properties('priority'='9');"
+ qt_select_alter_4 "select
name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from
information_schema.workload_schedule_policy where name='test_alter_policy'"
+
+ sql "alter workload schedule policy test_alter_policy
properties('workload_group'='normal');"
+ qt_select_alter_5 "select
name,condition,action,PRIORITY,ENABLED,VERSION,WORKLOAD_GROUP from
information_schema.workload_schedule_policy where name='test_alter_policy'"
+
+ sql "drop user test_alter_policy_user"
+ sql "drop workload schedule policy test_alter_policy"
+
// daemon thread alter test
def thread1 = new Thread({
def startTime = System.currentTimeMillis()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]