This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 055624ee6b3c1c6f96c687e0d54e1ba846118b63
Author: wangbo <[email protected]>
AuthorDate: Wed Apr 17 21:22:36 2024 +0800

    Add workload group id in workload policy's property (#33483)
---
 be/src/agent/workload_sched_policy_listener.cpp    |  8 ++-
 be/src/runtime/fragment_mgr.cpp                    |  2 +
 .../workload_management/workload_query_info.h      |  1 +
 .../workload_management/workload_sched_policy.cpp  | 11 +++-
 .../workload_management/workload_sched_policy.h    |  2 +
 .../workload_sched_policy_mgr.cpp                  |  1 -
 .../resource/workloadgroup/WorkloadGroupMgr.java   | 24 +++++++
 .../workloadschedpolicy/WorkloadSchedPolicy.java   | 41 +++++++++---
 .../WorkloadSchedPolicyMgr.java                    | 50 ++++++++++++--
 .../doris/tablefunction/MetadataGenerator.java     |  1 +
 .../WorkloadSchedPolicyTableValuedFunction.java    |  3 +-
 gensrc/thrift/BackendService.thrift                |  1 +
 .../test_workload_sched_policy.groovy              | 77 +++++++++++++++++++++-
 13 files changed, 202 insertions(+), 20 deletions(-)

diff --git a/be/src/agent/workload_sched_policy_listener.cpp 
b/be/src/agent/workload_sched_policy_listener.cpp
index 689a600367a..fc7c8ba8c18 100644
--- a/be/src/agent/workload_sched_policy_listener.cpp
+++ b/be/src/agent/workload_sched_policy_listener.cpp
@@ -63,9 +63,15 @@ void WorkloadschedPolicyListener::handle_topic_info(const 
std::vector<TopicInfo>
             continue;
         }
 
+        std::set<int64_t> wg_id_set;
+        for (int64_t wg_id : tpolicy.wg_id_list) {
+            wg_id_set.insert(wg_id);
+        }
+
         std::shared_ptr<WorkloadSchedPolicy> policy_ptr = 
std::make_shared<WorkloadSchedPolicy>();
         policy_ptr->init(tpolicy.id, tpolicy.name, tpolicy.version, 
tpolicy.enabled,
-                         tpolicy.priority, std::move(cond_ptr_list), 
std::move(action_ptr_list));
+                         tpolicy.priority, wg_id_set, std::move(cond_ptr_list),
+                         std::move(action_ptr_list));
         policy_map.emplace(tpolicy.id, std::move(policy_ptr));
     }
     size_t new_policy_size = policy_map.size();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3cde0778f62..3a1d722e805 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1566,6 +1566,8 @@ void 
FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i
             WorkloadQueryInfo workload_query_info;
             workload_query_info.query_id = print_id(q.first);
             workload_query_info.tquery_id = q.first;
+            workload_query_info.wg_id =
+                    q.second->workload_group() == nullptr ? -1 : 
q.second->workload_group()->id();
             query_info_list->push_back(workload_query_info);
         }
     }
diff --git a/be/src/runtime/workload_management/workload_query_info.h 
b/be/src/runtime/workload_management/workload_query_info.h
index 9c24e9dee8a..f2da31b6196 100644
--- a/be/src/runtime/workload_management/workload_query_info.h
+++ b/be/src/runtime/workload_management/workload_query_info.h
@@ -28,6 +28,7 @@ public:
     std::map<WorkloadMetricType, std::string> metric_map;
     TUniqueId tquery_id;
     std::string query_id;
+    int64_t wg_id;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp 
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index bc543a7d770..b97eb85c068 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -20,7 +20,7 @@
 namespace doris {
 
 void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool 
enabled,
-                               int priority,
+                               int priority, std::set<int64_t> wg_id_set,
                                std::vector<std::unique_ptr<WorkloadCondition>> 
condition_list,
                                std::vector<std::unique_ptr<WorkloadAction>> 
action_list) {
     _id = id;
@@ -30,6 +30,7 @@ void WorkloadSchedPolicy::init(int64_t id, std::string name, 
int version, bool e
     _priority = priority;
     _condition_list = std::move(condition_list);
     _action_list = std::move(action_list);
+    _wg_id_set = wg_id_set;
 
     _first_action_type = _action_list[0]->get_action_type();
     if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP &&
@@ -50,6 +51,14 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* 
query_info_ptr) {
     if (!_enabled) {
         return false;
     }
+
+    // 1 when policy has no group(_wg_id_set.size() < 0), it should match all 
query
+    // 2 when policy has group, it can only match the query which has the same 
group
+    if (_wg_id_set.size() > 0 && (query_info_ptr->wg_id <= 0 ||
+                                  _wg_id_set.find(query_info_ptr->wg_id) == 
_wg_id_set.end())) {
+        return false;
+    }
+
     auto& metric_val_map = query_info_ptr->metric_map;
     for (auto& cond : _condition_list) {
         if (metric_val_map.find(cond->get_workload_metric_type()) == 
metric_val_map.end()) {
diff --git a/be/src/runtime/workload_management/workload_sched_policy.h 
b/be/src/runtime/workload_management/workload_sched_policy.h
index 82f42b9a0b4..6554634d9af 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.h
+++ b/be/src/runtime/workload_management/workload_sched_policy.h
@@ -30,6 +30,7 @@ public:
     ~WorkloadSchedPolicy() = default;
 
     void init(int64_t id, std::string name, int version, bool enabled, int 
priority,
+              std::set<int64_t> wg_id_set,
               std::vector<std::unique_ptr<WorkloadCondition>> condition_list,
               std::vector<std::unique_ptr<WorkloadAction>> action_list);
 
@@ -50,6 +51,7 @@ private:
     int _version;
     bool _enabled;
     int _priority;
+    std::set<int64_t> _wg_id_set;
 
     std::vector<std::unique_ptr<WorkloadCondition>> _condition_list;
     std::vector<std::unique_ptr<WorkloadAction>> _action_list;
diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp 
b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
index c41a9e723e3..4690ed1d4f2 100644
--- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
@@ -77,7 +77,6 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
     while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) {
         // 1 get query info
         std::vector<WorkloadQueryInfo> list;
-        //todo(wb) maybe we can get runtime queryinfo from 
RuntimeQueryStatiticsMgr directly
         _exec_env->fragment_mgr()->get_runtime_query_info(&list);
         // todo: add timer
         if (list.size() == 0) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 7796a385eee..bd2aaf932c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -401,6 +401,17 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
             throw new DdlException("workload group " + workloadGroupName + " 
is set for user " + ret.second);
         }
 
+        // A group with related policies should not be deleted.
+        Long wgId = getWorkloadGroupIdByName(workloadGroupName);
+        if (wgId != null) {
+            boolean groupHasPolicy = 
Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
+                    .checkWhetherGroupHasPolicy(wgId.longValue());
+            if (groupHasPolicy) {
+                throw new DdlException(
+                        "workload group " + workloadGroupName + " can't be 
dropped, because it has related policy");
+            }
+        }
+
         writeLock();
         try {
             if (!nameToWorkloadGroup.containsKey(workloadGroupName)) {
@@ -493,6 +504,19 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
         }
     }
 
+    public Map<Long, String> getIdToNameMap() {
+        Map<Long, String> ret = Maps.newHashMap();
+        readLock();
+        try {
+            for (Map.Entry<Long, WorkloadGroup> entry : 
idToWorkloadGroup.entrySet()) {
+                ret.put(entry.getKey(), entry.getValue().getName());
+            }
+            return ret;
+        } finally {
+            readUnlock();
+        }
+    }
+
     public String getWorkloadGroupNameById(Long id) {
         readLock();
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 8b028e3e75f..2f8706c574b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -46,8 +46,10 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
 
     public static final String ENABLED = "enabled";
     public static final String PRIORITY = "priority";
+    public static final String WORKLOAD_GROUP = "workload_group";
+
     public static final ImmutableSet<String> POLICY_PROPERTIES = new 
ImmutableSet.Builder<String>()
-            .add(ENABLED).add(PRIORITY).build();
+            .add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build();
 
     // used for convert fe type to thrift type
     private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> 
METRIC_MAP
@@ -80,6 +82,9 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
     @SerializedName(value = "priority")
     private volatile int priority;
 
+    @SerializedName(value = "wgIdList")
+    private List<Long> workloadGroupIdList = new ArrayList<>();
+
     @SerializedName(value = "conditionMetaList")
     List<WorkloadConditionMeta> conditionMetaList;
     // we regard action as a command, map's key is command, map's value is 
args, so it's a command list
@@ -101,13 +106,18 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
     }
 
     public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> 
workloadConditionList,
-            List<WorkloadAction> workloadActionList, Map<String, String> 
properties) throws UserException {
+            List<WorkloadAction> workloadActionList, Map<String, String> 
properties, List<Long> wgIdList) {
         this.id = id;
         this.name = name;
         this.workloadConditionList = workloadConditionList;
         this.workloadActionList = workloadActionList;
-        // set enable and priority
-        parseAndSetProperties(properties);
+
+        String enabledStr = properties.get(ENABLED);
+        this.enabled = enabledStr == null ? true : 
Boolean.parseBoolean(enabledStr);
+
+        String priorityStr = properties.get(PRIORITY);
+        this.priority = priorityStr == null ? 0 : 
Integer.parseInt(priorityStr);
+        this.workloadGroupIdList = wgIdList;
         this.version = 0;
     }
 
@@ -162,12 +172,20 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
         return retType;
     }
 
-    public void parseAndSetProperties(Map<String, String> properties) throws 
UserException {
-        String enabledStr = properties.get(ENABLED);
-        this.enabled = enabledStr == null ? true : 
Boolean.parseBoolean(enabledStr);
+    public void updateProperty(Map<String, String> property, List<Long> 
wgIdList) {
+        String enabledStr = property.get(ENABLED);
+        if (enabledStr != null) {
+            this.enabled = Boolean.parseBoolean(enabledStr);
+        }
 
-        String priorityStr = properties.get(PRIORITY);
-        this.priority = priorityStr == null ? 0 : 
Integer.parseInt(priorityStr);
+        String priorityStr = property.get(PRIORITY);
+        if (priorityStr != null) {
+            this.priority = Integer.parseInt(priorityStr);
+        }
+
+        if (wgIdList.size() > 0) {
+            this.workloadGroupIdList = wgIdList;
+        }
     }
 
     void incrementVersion() {
@@ -194,6 +212,10 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
         return version;
     }
 
+    public List<Long> getWorkloadGroupIdList() {
+        return this.workloadGroupIdList;
+    }
+
     public List<WorkloadConditionMeta> getConditionMetaList() {
         return conditionMetaList;
     }
@@ -224,6 +246,7 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
         tPolicy.setVersion(version);
         tPolicy.setPriority(priority);
         tPolicy.setEnabled(enabled);
+        tPolicy.setWgIdList(workloadGroupIdList);
 
         List<TWorkloadCondition> condList = new ArrayList();
         for (WorkloadConditionMeta cond : conditionMetaList) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 5c35d1ee095..9e6eb33ffbf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -72,6 +72,7 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
     public static final ImmutableList<String> 
WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
             = new ImmutableList.Builder<String>()
             
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
+            .add("WorkloadGroup")
             .build();
 
     public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET
@@ -185,8 +186,9 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
         if (propMap == null) {
             propMap = new HashMap<>();
         }
+        List<Long> wgIdList = new ArrayList<>();
         if (propMap.size() != 0) {
-            checkProperties(propMap);
+            checkProperties(propMap, wgIdList);
         }
         writeLock();
         try {
@@ -203,7 +205,7 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
             }
             long id = Env.getCurrentEnv().getNextId();
             WorkloadSchedPolicy policy = new WorkloadSchedPolicy(id, 
policyName,
-                    policyConditionList, policyActionList, propMap);
+                    policyConditionList, policyActionList, propMap, wgIdList);
             policy.setConditionMeta(originConditions);
             policy.setActionMeta(originActions);
             
Env.getCurrentEnv().getEditLog().logCreateWorkloadSchedPolicy(policy);
@@ -382,7 +384,7 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
         return ret;
     }
 
-    private void checkProperties(Map<String, String> properties) throws 
UserException {
+    private void checkProperties(Map<String, String> properties, List<Long> 
wgIdList) throws UserException {
         Set<String> allInputPropKeySet = new HashSet<>();
         allInputPropKeySet.addAll(properties.keySet());
 
@@ -410,6 +412,15 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
                         "invalid priority property value, it must be a number, 
input value=" + priorityStr);
             }
         }
+
+        String workloadGroupNameStr = 
properties.get(WorkloadSchedPolicy.WORKLOAD_GROUP);
+        if (workloadGroupNameStr != null && !workloadGroupNameStr.isEmpty()) {
+            Long wgId = 
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroupIdByName(workloadGroupNameStr);
+            if (wgId == null) {
+                throw new UserException("unknown workload group:" + 
workloadGroupNameStr);
+            }
+            wgIdList.add(wgId);
+        }
     }
 
     public void alterWorkloadSchedPolicy(AlterWorkloadSchedPolicyStmt 
alterStmt) throws UserException {
@@ -422,8 +433,9 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
             }
 
             Map<String, String> properties = alterStmt.getProperties();
-            checkProperties(properties);
-            policy.parseAndSetProperties(properties);
+            List<Long> wgIdList = new ArrayList<>();
+            checkProperties(properties, wgIdList);
+            policy.updateProperty(properties, wgIdList);
             policy.incrementVersion();
             
Env.getCurrentEnv().getEditLog().logAlterWorkloadSchedPolicy(policy);
         } finally {
@@ -530,10 +542,25 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
         return policyProcNode.fetchResult(currentUserIdentity).getRows();
     }
 
+    public boolean checkWhetherGroupHasPolicy(long wgId) {
+        readLock();
+        try {
+            for (Map.Entry<Long, WorkloadSchedPolicy> entry : 
idToPolicy.entrySet()) {
+                if (entry.getValue().getWorkloadGroupIdList().contains(wgId)) {
+                    return true;
+                }
+            }
+        } finally {
+            readUnlock();
+        }
+        return false;
+    }
+
     public class PolicyProcNode {
         public ProcResult fetchResult(UserIdentity currentUserIdentity) {
             BaseProcResult result = new BaseProcResult();
             result.setNames(WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES);
+            Map<Long, String> idToNameMap = 
Env.getCurrentEnv().getWorkloadGroupMgr().getIdToNameMap();
             readLock();
             try {
                 for (WorkloadSchedPolicy policy : idToPolicy.values()) {
@@ -566,6 +593,19 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
                     row.add(String.valueOf(policy.getPriority()));
                     row.add(String.valueOf(policy.isEnabled()));
                     row.add(String.valueOf(policy.getVersion()));
+
+                    List<Long> wgIdList = policy.getWorkloadGroupIdList();
+                    if (wgIdList.size() == 0) {
+                        row.add("");
+                    } else {
+                        Long wgId = wgIdList.get(0);
+                        String wgName = idToNameMap.get(wgId);
+                        if (wgName == null) {
+                            row.add("null");
+                        } else {
+                            row.add(wgName);
+                        }
+                    }
                     result.addRow(row);
                 }
             } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index ac4a8558887..60f3806dc9f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -488,6 +488,7 @@ public class MetadataGenerator {
             trow.addToColumnValue(new 
TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
             trow.addToColumnValue(new 
TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
             trow.addToColumnValue(new 
TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
+            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(7))); 
// workload group id
             dataBatch.add(trow);
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
index b4795b21058..0bf2fa7e5d1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
@@ -41,7 +41,8 @@ public class WorkloadSchedPolicyTableValuedFunction extends 
MetadataTableValuedF
             new Column("Action", ScalarType.createType(PrimitiveType.STRING)),
             new Column("Priority", ScalarType.createType(PrimitiveType.INT)),
             new Column("Enabled", 
ScalarType.createType(PrimitiveType.BOOLEAN)),
-            new Column("Version", ScalarType.createType(PrimitiveType.INT)));
+            new Column("Version", ScalarType.createType(PrimitiveType.INT)),
+            new Column("WorkloadGroup", 
ScalarType.createType(PrimitiveType.STRING)));
 
     private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index f42aa41ab75..a8504fd4f15 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -221,6 +221,7 @@ struct TWorkloadSchedPolicy {
     5: optional bool enabled
     6: optional list<TWorkloadCondition> condition_list
     7: optional list<TWorkloadAction> action_list
+    8: optional list<i64> wg_id_list
 }
 
 struct TopicInfo {
diff --git 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
index 603bbdf520e..8531b3cf34a 100644
--- 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
+++ 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -52,7 +52,7 @@ suite("test_workload_sched_policy") {
             "'priority'='10' " +
             ");"
 
-    qt_select_policy_tvf "select 
name,condition,action,priority,enabled,version from workload_schedule_policy() 
order by name;"
+    qt_select_policy_tvf "select 
name,condition,action,priority,enabled,version from workload_schedule_policy() 
where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') 
order by name;"
 
     // test_alter
     sql "alter workload schedule policy fe_policy properties('priority'='2', 
'enabled'='false');"
@@ -112,7 +112,7 @@ suite("test_workload_sched_policy") {
     sql "drop workload schedule policy fe_policy;"
     sql "drop workload schedule policy be_policy;"
 
-    qt_select_policy_tvf_after_drop "select 
name,condition,action,priority,enabled,version from workload_schedule_policy() 
order by name;"
+    qt_select_policy_tvf_after_drop "select 
name,condition,action,priority,enabled,version from workload_schedule_policy() 
where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') 
order by name;"
 
     // test workload schedule policy
     sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = 
'500');"
@@ -155,4 +155,77 @@ suite("test_workload_sched_policy") {
     sql "drop workload schedule policy if exists test_set_var_policy;"
     sql "drop workload schedule policy if exists test_set_var_policy2;"
 
+    sql "drop user if exists test_policy_user"
+    sql "drop workload schedule policy if exists test_cancel_query_policy"
+    sql "drop workload schedule policy if exists test_cancel_query_policy2"
+    sql "drop workload schedule policy if exists test_set_session"
+    sql "drop workload group if exists policy_group;"
+    sql "CREATE USER 'test_policy_user'@'%' IDENTIFIED BY '12345';"
+    sql """grant SELECT_PRIV on *.*.* to test_policy_user;"""
+    sql "create workload group if not exists policy_group properties 
('cpu_share'='1024');"
+    sql "create workload group if not exists policy_group2 properties 
('cpu_share'='1024');"
+    sql "GRANT USAGE_PRIV ON WORKLOAD GROUP 'policy_group' TO 
'test_policy_user'@'%';"
+    sql "GRANT USAGE_PRIV ON WORKLOAD GROUP 'policy_group2' TO 
'test_policy_user'@'%';"
+    sql "create workload schedule policy test_cancel_query_policy 
conditions(query_time > 1000) actions(cancel_query) 
properties('workload_group'='policy_group')"
+    sql "create workload schedule policy test_cancel_query_policy2 
conditions(query_time > 0, be_scan_rows>1) actions(cancel_query) 
properties('workload_group'='policy_group')"
+    sql "create workload schedule policy test_set_session 
conditions(username='test_policy_user') actions(set_session_variable 
'parallel_pipeline_task_num=1')"
+
+    test {
+        sql "drop workload group policy_group;"
+        exception "because it has related policy"
+    }
+
+    test {
+        sql "alter workload schedule policy test_cancel_query_policy 
properties('workload_group'='invalid_gorup');"
+        exception "unknown workload group"
+    }
+
+    // daemon thread alter test
+    def thread1 = new Thread({
+        def startTime = System.currentTimeMillis()
+        def curTime = System.currentTimeMillis()
+        def totalTime = 30 * 60 * 1000 // 30min
+
+        connect(user = 'test_policy_user', password = '12345', url = 
context.config.jdbcUrl) {
+            sql "set workload_group=policy_group"
+            boolean flag = false
+            long lastTime = System.currentTimeMillis()
+
+            while (curTime - startTime <= totalTime) {
+                if (curTime - lastTime > 20000) {
+                    if (flag) {
+                        connect(user = 'root', password = '', url = 
context.config.jdbcUrl) {
+                            sql "alter workload schedule policy 
test_cancel_query_policy properties('workload_group'='policy_group2');"
+                            sql "alter workload schedule policy 
test_cancel_query_policy2 properties('workload_group'='policy_group');"
+                        }
+                        flag = false
+                    } else {
+                        connect(user = 'root', password = '', url = 
context.config.jdbcUrl) {
+                            sql "alter workload schedule policy 
test_cancel_query_policy properties('workload_group'='policy_group');"
+                            sql "alter workload schedule policy 
test_cancel_query_policy2 properties('workload_group'='policy_group2');"
+                        }
+                        flag = true
+                    }
+                    lastTime = System.currentTimeMillis()
+                }
+                try {
+                    sql "select k0,k1,k2,k3,k4,k5,k6,count(distinct k13) from 
regression_test_load_p0_insert.baseall group by k0,k1,k2,k3,k4,k5,k6"
+                } catch (Exception e) {
+                    assertTrue(e.getMessage().contains("query canceled by 
workload scheduler"))
+                }
+
+                try {
+                    sql "select count(1) from 
regression_test_load_p0_insert.baseall"
+                } catch (Exception e) {
+                    assertTrue(e.getMessage().contains("query canceled by 
workload scheduler"))
+                }
+
+                Thread.sleep(1000)
+                curTime = System.currentTimeMillis()
+            }
+        }
+    })
+    thread1.setDaemon(true)
+    thread1.start()
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to