This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 055624ee6b3c1c6f96c687e0d54e1ba846118b63 Author: wangbo <[email protected]> AuthorDate: Wed Apr 17 21:22:36 2024 +0800 Add workload group id in workload policy's property (#33483) --- be/src/agent/workload_sched_policy_listener.cpp | 8 ++- be/src/runtime/fragment_mgr.cpp | 2 + .../workload_management/workload_query_info.h | 1 + .../workload_management/workload_sched_policy.cpp | 11 +++- .../workload_management/workload_sched_policy.h | 2 + .../workload_sched_policy_mgr.cpp | 1 - .../resource/workloadgroup/WorkloadGroupMgr.java | 24 +++++++ .../workloadschedpolicy/WorkloadSchedPolicy.java | 41 +++++++++--- .../WorkloadSchedPolicyMgr.java | 50 ++++++++++++-- .../doris/tablefunction/MetadataGenerator.java | 1 + .../WorkloadSchedPolicyTableValuedFunction.java | 3 +- gensrc/thrift/BackendService.thrift | 1 + .../test_workload_sched_policy.groovy | 77 +++++++++++++++++++++- 13 files changed, 202 insertions(+), 20 deletions(-) diff --git a/be/src/agent/workload_sched_policy_listener.cpp b/be/src/agent/workload_sched_policy_listener.cpp index 689a600367a..fc7c8ba8c18 100644 --- a/be/src/agent/workload_sched_policy_listener.cpp +++ b/be/src/agent/workload_sched_policy_listener.cpp @@ -63,9 +63,15 @@ void WorkloadschedPolicyListener::handle_topic_info(const std::vector<TopicInfo> continue; } + std::set<int64_t> wg_id_set; + for (int64_t wg_id : tpolicy.wg_id_list) { + wg_id_set.insert(wg_id); + } + std::shared_ptr<WorkloadSchedPolicy> policy_ptr = std::make_shared<WorkloadSchedPolicy>(); policy_ptr->init(tpolicy.id, tpolicy.name, tpolicy.version, tpolicy.enabled, - tpolicy.priority, std::move(cond_ptr_list), std::move(action_ptr_list)); + tpolicy.priority, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); policy_map.emplace(tpolicy.id, std::move(policy_ptr)); } size_t new_policy_size = policy_map.size(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3cde0778f62..3a1d722e805 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1566,6 +1566,8 @@ void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i WorkloadQueryInfo workload_query_info; workload_query_info.query_id = print_id(q.first); workload_query_info.tquery_id = q.first; + workload_query_info.wg_id = + q.second->workload_group() == nullptr ? -1 : q.second->workload_group()->id(); query_info_list->push_back(workload_query_info); } } diff --git a/be/src/runtime/workload_management/workload_query_info.h b/be/src/runtime/workload_management/workload_query_info.h index 9c24e9dee8a..f2da31b6196 100644 --- a/be/src/runtime/workload_management/workload_query_info.h +++ b/be/src/runtime/workload_management/workload_query_info.h @@ -28,6 +28,7 @@ public: std::map<WorkloadMetricType, std::string> metric_map; TUniqueId tquery_id; std::string query_id; + int64_t wg_id; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp index bc543a7d770..b97eb85c068 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -20,7 +20,7 @@ namespace doris { void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool enabled, - int priority, + int priority, std::set<int64_t> wg_id_set, std::vector<std::unique_ptr<WorkloadCondition>> condition_list, std::vector<std::unique_ptr<WorkloadAction>> action_list) { _id = id; @@ -30,6 +30,7 @@ void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool e _priority = priority; _condition_list = std::move(condition_list); _action_list = std::move(action_list); + _wg_id_set = wg_id_set; _first_action_type = _action_list[0]->get_action_type(); if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP && @@ -50,6 +51,14 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) { if (!_enabled) { return false; } + + // 1 when policy has no group(_wg_id_set.size() < 0), it should match all query + // 2 when policy has group, it can only match the query which has the same group + if (_wg_id_set.size() > 0 && (query_info_ptr->wg_id <= 0 || + _wg_id_set.find(query_info_ptr->wg_id) == _wg_id_set.end())) { + return false; + } + auto& metric_val_map = query_info_ptr->metric_map; for (auto& cond : _condition_list) { if (metric_val_map.find(cond->get_workload_metric_type()) == metric_val_map.end()) { diff --git a/be/src/runtime/workload_management/workload_sched_policy.h b/be/src/runtime/workload_management/workload_sched_policy.h index 82f42b9a0b4..6554634d9af 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.h +++ b/be/src/runtime/workload_management/workload_sched_policy.h @@ -30,6 +30,7 @@ public: ~WorkloadSchedPolicy() = default; void init(int64_t id, std::string name, int version, bool enabled, int priority, + std::set<int64_t> wg_id_set, std::vector<std::unique_ptr<WorkloadCondition>> condition_list, std::vector<std::unique_ptr<WorkloadAction>> action_list); @@ -50,6 +51,7 @@ private: int _version; bool _enabled; int _priority; + std::set<int64_t> _wg_id_set; std::vector<std::unique_ptr<WorkloadCondition>> _condition_list; std::vector<std::unique_ptr<WorkloadAction>> _action_list; diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp index c41a9e723e3..4690ed1d4f2 100644 --- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp @@ -77,7 +77,6 @@ void WorkloadSchedPolicyMgr::_schedule_workload() { while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) { // 1 get query info std::vector<WorkloadQueryInfo> list; - //todo(wb) maybe we can get runtime queryinfo from RuntimeQueryStatiticsMgr directly _exec_env->fragment_mgr()->get_runtime_query_info(&list); // todo: add timer if (list.size() == 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 7796a385eee..bd2aaf932c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -401,6 +401,17 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { throw new DdlException("workload group " + workloadGroupName + " is set for user " + ret.second); } + // A group with related policies should not be deleted. + Long wgId = getWorkloadGroupIdByName(workloadGroupName); + if (wgId != null) { + boolean groupHasPolicy = Env.getCurrentEnv().getWorkloadSchedPolicyMgr() + .checkWhetherGroupHasPolicy(wgId.longValue()); + if (groupHasPolicy) { + throw new DdlException( + "workload group " + workloadGroupName + " can't be dropped, because it has related policy"); + } + } + writeLock(); try { if (!nameToWorkloadGroup.containsKey(workloadGroupName)) { @@ -493,6 +504,19 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { } } + public Map<Long, String> getIdToNameMap() { + Map<Long, String> ret = Maps.newHashMap(); + readLock(); + try { + for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) { + ret.put(entry.getKey(), entry.getValue().getName()); + } + return ret; + } finally { + readUnlock(); + } + } + public String getWorkloadGroupNameById(Long id) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index 8b028e3e75f..2f8706c574b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -46,8 +46,10 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { public static final String ENABLED = "enabled"; public static final String PRIORITY = "priority"; + public static final String WORKLOAD_GROUP = "workload_group"; + public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>() - .add(ENABLED).add(PRIORITY).build(); + .add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build(); // used for convert fe type to thrift type private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP @@ -80,6 +82,9 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { @SerializedName(value = "priority") private volatile int priority; + @SerializedName(value = "wgIdList") + private List<Long> workloadGroupIdList = new ArrayList<>(); + @SerializedName(value = "conditionMetaList") List<WorkloadConditionMeta> conditionMetaList; // we regard action as a command, map's key is command, map's value is args, so it's a command list @@ -101,13 +106,18 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { } public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> workloadConditionList, - List<WorkloadAction> workloadActionList, Map<String, String> properties) throws UserException { + List<WorkloadAction> workloadActionList, Map<String, String> properties, List<Long> wgIdList) { this.id = id; this.name = name; this.workloadConditionList = workloadConditionList; this.workloadActionList = workloadActionList; - // set enable and priority - parseAndSetProperties(properties); + + String enabledStr = properties.get(ENABLED); + this.enabled = enabledStr == null ? true : Boolean.parseBoolean(enabledStr); + + String priorityStr = properties.get(PRIORITY); + this.priority = priorityStr == null ? 0 : Integer.parseInt(priorityStr); + this.workloadGroupIdList = wgIdList; this.version = 0; } @@ -162,12 +172,20 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { return retType; } - public void parseAndSetProperties(Map<String, String> properties) throws UserException { - String enabledStr = properties.get(ENABLED); - this.enabled = enabledStr == null ? true : Boolean.parseBoolean(enabledStr); + public void updateProperty(Map<String, String> property, List<Long> wgIdList) { + String enabledStr = property.get(ENABLED); + if (enabledStr != null) { + this.enabled = Boolean.parseBoolean(enabledStr); + } - String priorityStr = properties.get(PRIORITY); - this.priority = priorityStr == null ? 0 : Integer.parseInt(priorityStr); + String priorityStr = property.get(PRIORITY); + if (priorityStr != null) { + this.priority = Integer.parseInt(priorityStr); + } + + if (wgIdList.size() > 0) { + this.workloadGroupIdList = wgIdList; + } } void incrementVersion() { @@ -194,6 +212,10 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { return version; } + public List<Long> getWorkloadGroupIdList() { + return this.workloadGroupIdList; + } + public List<WorkloadConditionMeta> getConditionMetaList() { return conditionMetaList; } @@ -224,6 +246,7 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { tPolicy.setVersion(version); tPolicy.setPriority(priority); tPolicy.setEnabled(enabled); + tPolicy.setWgIdList(workloadGroupIdList); List<TWorkloadCondition> condList = new ArrayList(); for (WorkloadConditionMeta cond : conditionMetaList) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 5c35d1ee095..9e6eb33ffbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -72,6 +72,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { public static final ImmutableList<String> WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES = new ImmutableList.Builder<String>() .add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version") + .add("WorkloadGroup") .build(); public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET @@ -185,8 +186,9 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { if (propMap == null) { propMap = new HashMap<>(); } + List<Long> wgIdList = new ArrayList<>(); if (propMap.size() != 0) { - checkProperties(propMap); + checkProperties(propMap, wgIdList); } writeLock(); try { @@ -203,7 +205,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { } long id = Env.getCurrentEnv().getNextId(); WorkloadSchedPolicy policy = new WorkloadSchedPolicy(id, policyName, - policyConditionList, policyActionList, propMap); + policyConditionList, policyActionList, propMap, wgIdList); policy.setConditionMeta(originConditions); policy.setActionMeta(originActions); Env.getCurrentEnv().getEditLog().logCreateWorkloadSchedPolicy(policy); @@ -382,7 +384,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { return ret; } - private void checkProperties(Map<String, String> properties) throws UserException { + private void checkProperties(Map<String, String> properties, List<Long> wgIdList) throws UserException { Set<String> allInputPropKeySet = new HashSet<>(); allInputPropKeySet.addAll(properties.keySet()); @@ -410,6 +412,15 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { "invalid priority property value, it must be a number, input value=" + priorityStr); } } + + String workloadGroupNameStr = properties.get(WorkloadSchedPolicy.WORKLOAD_GROUP); + if (workloadGroupNameStr != null && !workloadGroupNameStr.isEmpty()) { + Long wgId = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroupIdByName(workloadGroupNameStr); + if (wgId == null) { + throw new UserException("unknown workload group:" + workloadGroupNameStr); + } + wgIdList.add(wgId); + } } public void alterWorkloadSchedPolicy(AlterWorkloadSchedPolicyStmt alterStmt) throws UserException { @@ -422,8 +433,9 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { } Map<String, String> properties = alterStmt.getProperties(); - checkProperties(properties); - policy.parseAndSetProperties(properties); + List<Long> wgIdList = new ArrayList<>(); + checkProperties(properties, wgIdList); + policy.updateProperty(properties, wgIdList); policy.incrementVersion(); Env.getCurrentEnv().getEditLog().logAlterWorkloadSchedPolicy(policy); } finally { @@ -530,10 +542,25 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { return policyProcNode.fetchResult(currentUserIdentity).getRows(); } + public boolean checkWhetherGroupHasPolicy(long wgId) { + readLock(); + try { + for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) { + if (entry.getValue().getWorkloadGroupIdList().contains(wgId)) { + return true; + } + } + } finally { + readUnlock(); + } + return false; + } + public class PolicyProcNode { public ProcResult fetchResult(UserIdentity currentUserIdentity) { BaseProcResult result = new BaseProcResult(); result.setNames(WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES); + Map<Long, String> idToNameMap = Env.getCurrentEnv().getWorkloadGroupMgr().getIdToNameMap(); readLock(); try { for (WorkloadSchedPolicy policy : idToPolicy.values()) { @@ -566,6 +593,19 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { row.add(String.valueOf(policy.getPriority())); row.add(String.valueOf(policy.isEnabled())); row.add(String.valueOf(policy.getVersion())); + + List<Long> wgIdList = policy.getWorkloadGroupIdList(); + if (wgIdList.size() == 0) { + row.add(""); + } else { + Long wgId = wgIdList.get(0); + String wgName = idToNameMap.get(wgId); + if (wgName == null) { + row.add("null"); + } else { + row.add(wgName); + } + } result.addRow(row); } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index ac4a8558887..60f3806dc9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -488,6 +488,7 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority trow.addToColumnValue(new TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(7))); // workload group id dataBatch.add(trow); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java index b4795b21058..0bf2fa7e5d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java @@ -41,7 +41,8 @@ public class WorkloadSchedPolicyTableValuedFunction extends MetadataTableValuedF new Column("Action", ScalarType.createType(PrimitiveType.STRING)), new Column("Priority", ScalarType.createType(PrimitiveType.INT)), new Column("Enabled", ScalarType.createType(PrimitiveType.BOOLEAN)), - new Column("Version", ScalarType.createType(PrimitiveType.INT))); + new Column("Version", ScalarType.createType(PrimitiveType.INT)), + new Column("WorkloadGroup", ScalarType.createType(PrimitiveType.STRING))); private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX; diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index f42aa41ab75..a8504fd4f15 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -221,6 +221,7 @@ struct TWorkloadSchedPolicy { 5: optional bool enabled 6: optional list<TWorkloadCondition> condition_list 7: optional list<TWorkloadAction> action_list + 8: optional list<i64> wg_id_list } struct TopicInfo { diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy index 603bbdf520e..8531b3cf34a 100644 --- a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -52,7 +52,7 @@ suite("test_workload_sched_policy") { "'priority'='10' " + ");" - qt_select_policy_tvf "select name,condition,action,priority,enabled,version from workload_schedule_policy() order by name;" + qt_select_policy_tvf "select name,condition,action,priority,enabled,version from workload_schedule_policy() where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;" // test_alter sql "alter workload schedule policy fe_policy properties('priority'='2', 'enabled'='false');" @@ -112,7 +112,7 @@ suite("test_workload_sched_policy") { sql "drop workload schedule policy fe_policy;" sql "drop workload schedule policy be_policy;" - qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from workload_schedule_policy() order by name;" + qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from workload_schedule_policy() where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;" // test workload schedule policy sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '500');" @@ -155,4 +155,77 @@ suite("test_workload_sched_policy") { sql "drop workload schedule policy if exists test_set_var_policy;" sql "drop workload schedule policy if exists test_set_var_policy2;" + sql "drop user if exists test_policy_user" + sql "drop workload schedule policy if exists test_cancel_query_policy" + sql "drop workload schedule policy if exists test_cancel_query_policy2" + sql "drop workload schedule policy if exists test_set_session" + sql "drop workload group if exists policy_group;" + sql "CREATE USER 'test_policy_user'@'%' IDENTIFIED BY '12345';" + sql """grant SELECT_PRIV on *.*.* to test_policy_user;""" + sql "create workload group if not exists policy_group properties ('cpu_share'='1024');" + sql "create workload group if not exists policy_group2 properties ('cpu_share'='1024');" + sql "GRANT USAGE_PRIV ON WORKLOAD GROUP 'policy_group' TO 'test_policy_user'@'%';" + sql "GRANT USAGE_PRIV ON WORKLOAD GROUP 'policy_group2' TO 'test_policy_user'@'%';" + sql "create workload schedule policy test_cancel_query_policy conditions(query_time > 1000) actions(cancel_query) properties('workload_group'='policy_group')" + sql "create workload schedule policy test_cancel_query_policy2 conditions(query_time > 0, be_scan_rows>1) actions(cancel_query) properties('workload_group'='policy_group')" + sql "create workload schedule policy test_set_session conditions(username='test_policy_user') actions(set_session_variable 'parallel_pipeline_task_num=1')" + + test { + sql "drop workload group policy_group;" + exception "because it has related policy" + } + + test { + sql "alter workload schedule policy test_cancel_query_policy properties('workload_group'='invalid_gorup');" + exception "unknown workload group" + } + + // daemon thread alter test + def thread1 = new Thread({ + def startTime = System.currentTimeMillis() + def curTime = System.currentTimeMillis() + def totalTime = 30 * 60 * 1000 // 30min + + connect(user = 'test_policy_user', password = '12345', url = context.config.jdbcUrl) { + sql "set workload_group=policy_group" + boolean flag = false + long lastTime = System.currentTimeMillis() + + while (curTime - startTime <= totalTime) { + if (curTime - lastTime > 20000) { + if (flag) { + connect(user = 'root', password = '', url = context.config.jdbcUrl) { + sql "alter workload schedule policy test_cancel_query_policy properties('workload_group'='policy_group2');" + sql "alter workload schedule policy test_cancel_query_policy2 properties('workload_group'='policy_group');" + } + flag = false + } else { + connect(user = 'root', password = '', url = context.config.jdbcUrl) { + sql "alter workload schedule policy test_cancel_query_policy properties('workload_group'='policy_group');" + sql "alter workload schedule policy test_cancel_query_policy2 properties('workload_group'='policy_group2');" + } + flag = true + } + lastTime = System.currentTimeMillis() + } + try { + sql "select k0,k1,k2,k3,k4,k5,k6,count(distinct k13) from regression_test_load_p0_insert.baseall group by k0,k1,k2,k3,k4,k5,k6" + } catch (Exception e) { + assertTrue(e.getMessage().contains("query canceled by workload scheduler")) + } + + try { + sql "select count(1) from regression_test_load_p0_insert.baseall" + } catch (Exception e) { + assertTrue(e.getMessage().contains("query canceled by workload scheduler")) + } + + Thread.sleep(1000) + curTime = System.currentTimeMillis() + } + } + }) + thread1.setDaemon(true) + thread1.start() + } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
