This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit fbbaf74b7782aa0182c59392933fdf8c51dbc928
Author: wangbo <[email protected]>
AuthorDate: Mon Feb 26 19:01:35 2024 +0800

    [Improment](executor)Add scanbytes/scanrows condition (#31364)
    
    * Add scanbytes/scanrows condition
    
    * fix reg
---
 be/src/agent/workload_sched_policy_listener.cpp    |   4 +-
 be/src/runtime/fragment_mgr.cpp                    |   7 --
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  19 ++++
 be/src/runtime/runtime_query_statistics_mgr.h      |   8 ++
 .../runtime/workload_management/workload_action.h  |   4 +-
 .../workload_management/workload_condition.h       |   4 +-
 .../workload_sched_policy_mgr.cpp                  |   5 +
 .../main/java/org/apache/doris/common/Config.java  |   3 -
 .../main/java/org/apache/doris/catalog/Env.java    |   6 -
 .../publish/WorkloadActionPublishThread.java       | 123 ---------------------
 .../workloadschedpolicy/WorkloadAction.java        |   2 -
 .../WorkloadActionMoveQueryToGroup.java            |  67 -----------
 .../workloadschedpolicy/WorkloadCondition.java     |   4 +
 ...tion.java => WorkloadConditionBeScanBytes.java} |  32 ++++--
 ...ition.java => WorkloadConditionBeScanRows.java} |  32 ++++--
 .../workloadschedpolicy/WorkloadConditionMeta.java |   4 +
 .../workloadschedpolicy/WorkloadMetricType.java    |   2 +-
 .../workloadschedpolicy/WorkloadSchedPolicy.java   |   4 +-
 .../WorkloadSchedPolicyMgr.java                    |  22 +---
 gensrc/thrift/BackendService.thrift                |  12 +-
 .../test_workload_sched_policy.out                 |   1 -
 .../test_workload_sched_policy.groovy              |  21 +---
 22 files changed, 101 insertions(+), 285 deletions(-)

diff --git a/be/src/agent/workload_sched_policy_listener.cpp 
b/be/src/agent/workload_sched_policy_listener.cpp
index 461fd2cbb0f..689a600367a 100644
--- a/be/src/agent/workload_sched_policy_listener.cpp
+++ b/be/src/agent/workload_sched_policy_listener.cpp
@@ -69,9 +69,7 @@ void WorkloadschedPolicyListener::handle_topic_info(const 
std::vector<TopicInfo>
         policy_map.emplace(tpolicy.id, std::move(policy_ptr));
     }
     size_t new_policy_size = policy_map.size();
-    if (new_policy_size > 0) {
-        
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
-    }
+    
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
     LOG(INFO) << "[workload_schedule]finish update workload schedule policy, 
size="
               << new_policy_size;
 }
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f6985538854..e52b71d277b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1506,17 +1506,10 @@ void 
FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
 void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
query_info_list) {
     {
         std::lock_guard<std::mutex> lock(_lock);
-        // todo: use monotonic time
-        VecDateTimeValue now = VecDateTimeValue::local_time();
         for (const auto& q : _query_ctx_map) {
             WorkloadQueryInfo workload_query_info;
             workload_query_info.query_id = print_id(q.first);
             workload_query_info.tquery_id = q.first;
-
-            uint64_t query_time_millisecond = q.second->query_time(now) * 1000;
-            
workload_query_info.metric_map.emplace(WorkloadMetricType::QUERY_TIME,
-                                                   
std::to_string(query_time_millisecond));
-            // todo, add scan rows, scan bytes
             query_info_list->push_back(workload_query_info);
         }
     }
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index a658e527f61..c6f70643f13 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -172,6 +172,25 @@ std::shared_ptr<QueryStatistics> 
RuntimeQueryStatiticsMgr::get_runtime_query_sta
     return qs_ptr;
 }
 
+void RuntimeQueryStatiticsMgr::get_metric_map(
+        std::string query_id, std::map<WorkloadMetricType, std::string>& 
metric_map) {
+    QueryStatistics ret_qs;
+    int64_t query_time_ms = 0;
+    {
+        std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
+        if (_query_statistics_ctx_map.find(query_id) != 
_query_statistics_ctx_map.end()) {
+            for (auto const& qs : 
_query_statistics_ctx_map[query_id]->_qs_list) {
+                ret_qs.merge(*qs);
+            }
+            query_time_ms =
+                    MonotonicMillis() - 
_query_statistics_ctx_map.at(query_id)->_query_start_time;
+        }
+    }
+    metric_map.emplace(WorkloadMetricType::QUERY_TIME, 
std::to_string(query_time_ms));
+    metric_map.emplace(WorkloadMetricType::SCAN_ROWS, 
std::to_string(ret_qs.get_scan_rows()));
+    metric_map.emplace(WorkloadMetricType::SCAN_BYTES, 
std::to_string(ret_qs.get_scan_bytes()));
+}
+
 void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, 
int64_t wg_id) {
     // wg id just need eventual consistency, read lock is ok
     std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index 98d4f554728..69b283b6d14 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -21,6 +21,8 @@
 #include <string>
 
 #include "runtime/query_statistics.h"
+#include "runtime/workload_management/workload_condition.h"
+#include "util/time.h"
 
 namespace doris {
 
@@ -29,6 +31,7 @@ public:
     QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) {
         this->_is_query_finished = false;
         this->_wg_id = -1;
+        this->_query_start_time = MonotonicMillis();
     }
     ~QueryStatisticsCtx() = default;
 
@@ -40,6 +43,7 @@ public:
     TNetworkAddress _fe_addr;
     int64_t _query_finish_time;
     int64_t _wg_id;
+    int64_t _query_start_time;
 };
 
 class RuntimeQueryStatiticsMgr {
@@ -58,6 +62,10 @@ public:
 
     void set_workload_group_id(std::string query_id, int64_t wg_id);
 
+    // used for workload scheduler policy
+    void get_metric_map(std::string query_id,
+                        std::map<WorkloadMetricType, std::string>& metric_map);
+
 private:
     std::shared_mutex _qs_ctx_map_lock;
     std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> 
_query_statistics_ctx_map;
diff --git a/be/src/runtime/workload_management/workload_action.h 
b/be/src/runtime/workload_management/workload_action.h
index 29c01320b78..785acc73c3a 100644
--- a/be/src/runtime/workload_management/workload_action.h
+++ b/be/src/runtime/workload_management/workload_action.h
@@ -55,9 +55,7 @@ private:
 class WorkloadActionFactory {
 public:
     static std::unique_ptr<WorkloadAction> 
create_workload_action(TWorkloadAction* action) {
-        if (TWorkloadActionType::type::MOVE_QUERY_TO_GROUP == action->action) {
-            return 
std::make_unique<WorkloadActionMoveQuery>(action->action_args);
-        } else if (TWorkloadActionType::type::CANCEL_QUERY == action->action) {
+        if (TWorkloadActionType::type::CANCEL_QUERY == action->action) {
             return std::make_unique<WorkloadActionCancelQuery>();
         }
         LOG(ERROR) << "not find a action " << action->action;
diff --git a/be/src/runtime/workload_management/workload_condition.h 
b/be/src/runtime/workload_management/workload_condition.h
index 3486742c968..96387a2af41 100644
--- a/be/src/runtime/workload_management/workload_condition.h
+++ b/be/src/runtime/workload_management/workload_condition.h
@@ -84,9 +84,9 @@ public:
         TWorkloadMetricType::type metric_name = t_cond->metric_name;
         if (TWorkloadMetricType::type::QUERY_TIME == metric_name) {
             return std::make_unique<WorkloadConditionQueryTime>(op, str_val);
-        } else if (TWorkloadMetricType::type::SCAN_ROWS == metric_name) {
+        } else if (TWorkloadMetricType::type::BE_SCAN_ROWS == metric_name) {
             return std::make_unique<WorkloadConditionScanRows>(op, str_val);
-        } else if (TWorkloadMetricType::type::SCAN_BYTES == metric_name) {
+        } else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) {
             return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
         }
         LOG(ERROR) << "not find a metric name " << metric_name;
diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp 
b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
index 731dd0c8661..c41a9e723e3 100644
--- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
@@ -18,6 +18,7 @@
 #include "runtime/workload_management/workload_sched_policy_mgr.h"
 
 #include "runtime/fragment_mgr.h"
+#include "runtime/runtime_query_statistics_mgr.h"
 
 namespace doris {
 
@@ -76,6 +77,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
     while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) {
         // 1 get query info
         std::vector<WorkloadQueryInfo> list;
+        //todo(wb) maybe we can get runtime queryinfo from 
RuntimeQueryStatiticsMgr directly
         _exec_env->fragment_mgr()->get_runtime_query_info(&list);
         // todo: add timer
         if (list.size() == 0) {
@@ -84,6 +86,9 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
 
         for (int i = 0; i < list.size(); i++) {
             WorkloadQueryInfo* query_info_ptr = &(list[i]);
+            
_exec_env->runtime_query_statistics_mgr()->get_metric_map(query_info_ptr->query_id,
+                                                                      
query_info_ptr->metric_map);
+
             // 2 get matched policy
             std::map<WorkloadActionType, std::shared_ptr<WorkloadSchedPolicy>> 
matched_policy_map;
             {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 541592f2095..0d89becbb28 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2391,9 +2391,6 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int workload_sched_policy_interval_ms = 10000; // 10s
 
-    @ConfField(mutable = true)
-    public static int workload_action_interval_ms = 10000; // 10s
-
     @ConfField(mutable = true, masterOnly = true)
     public static int workload_max_policy_num = 25;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 8c65a71dafa..76ff70589b9 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -111,7 +111,6 @@ import org.apache.doris.common.io.CountingDataOutputStream;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.publish.TopicPublisher;
 import org.apache.doris.common.publish.TopicPublisherThread;
-import org.apache.doris.common.publish.WorkloadActionPublishThread;
 import org.apache.doris.common.publish.WorkloadGroupPublisher;
 import org.apache.doris.common.util.Daemon;
 import org.apache.doris.common.util.DynamicPartitionUtil;
@@ -522,8 +521,6 @@ public class Env {
 
     private TopicPublisherThread topicPublisherThread;
 
-    private WorkloadActionPublishThread workloadActionPublisherThread;
-
     private MTMVService mtmvService;
 
     private InsertOverwriteManager insertOverwriteManager;
@@ -761,8 +758,6 @@ public class Env {
         this.queryCancelWorker = new QueryCancelWorker(systemInfo);
         this.topicPublisherThread = new TopicPublisherThread(
                 "TopicPublisher", Config.publish_topic_info_interval_ms, 
systemInfo);
-        this.workloadActionPublisherThread = new 
WorkloadActionPublishThread("WorkloadActionPublisher",
-                Config.workload_action_interval_ms, systemInfo);
         this.mtmvService = new MTMVService();
         this.insertOverwriteManager = new InsertOverwriteManager();
     }
@@ -1042,7 +1037,6 @@ public class Env {
 
         workloadGroupMgr.startUpdateThread();
         workloadSchedPolicyMgr.start();
-        workloadActionPublisherThread.start();
         workloadRuntimeStatusMgr.start();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java
deleted file mode 100644
index cacd7a9da9f..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java
+++ /dev/null
@@ -1,123 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.publish;
-
-import org.apache.doris.common.ClientPool;
-import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.common.util.Daemon;
-import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.BackendService;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
-import org.apache.doris.thrift.TopicInfo;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-public class WorkloadActionPublishThread extends Daemon {
-
-    private ExecutorService executor = ThreadPoolManager
-            .newDaemonFixedThreadPool(4, 256, 
"workload-action-publish-thread", true);
-
-    private static final Logger LOG = 
LogManager.getLogger(WorkloadActionPublishThread.class);
-
-    public static Map<TTopicInfoType, List<TopicInfo>> 
workloadActionToplicInfoMap
-            = new HashMap<TTopicInfoType, List<TopicInfo>>();
-
-    public static synchronized void putWorkloadAction(TTopicInfoType type, 
TopicInfo topicInfo) {
-        List<TopicInfo> list = workloadActionToplicInfoMap.get(type);
-        if (list == null) {
-            list = new ArrayList<TopicInfo>();
-            workloadActionToplicInfoMap.put(type, list);
-        }
-        list.add(topicInfo);
-    }
-
-    public static synchronized Map<TTopicInfoType, List<TopicInfo>> 
getCurrentWorkloadActionMap() {
-        Map<TTopicInfoType, List<TopicInfo>> retMap = 
workloadActionToplicInfoMap;
-        workloadActionToplicInfoMap = new HashMap<TTopicInfoType, 
List<TopicInfo>>();
-        return retMap;
-    }
-
-    private SystemInfoService clusterInfoService;
-
-    public WorkloadActionPublishThread(String name, long intervalMs,
-            SystemInfoService clusterInfoService) {
-        super(name, intervalMs);
-        this.clusterInfoService = clusterInfoService;
-    }
-
-    @Override
-    protected final void runOneCycle() {
-        Map<TTopicInfoType, List<TopicInfo>> actionMap
-                = WorkloadActionPublishThread.getCurrentWorkloadActionMap();
-        if (actionMap.size() == 0) {
-            LOG.info("no workload action found, skip publish");
-            return;
-        }
-        Collection<Backend> currentBeToPublish = 
clusterInfoService.getIdToBackend().values();
-        AckResponseHandler handler = new 
AckResponseHandler(currentBeToPublish);
-        TPublishTopicRequest request = new TPublishTopicRequest();
-        request.setTopicMap(actionMap);
-        for (Backend be : currentBeToPublish) {
-            executor.submit(new WorkloadMoveActionTask(request, be, handler));
-        }
-    }
-
-    public class WorkloadMoveActionTask implements Runnable {
-
-        private TPublishTopicRequest request;
-
-        private Backend be;
-
-        private ResponseHandler handler;
-
-        public WorkloadMoveActionTask(TPublishTopicRequest request, Backend be,
-                ResponseHandler handler) {
-            this.request = request;
-            this.be = be;
-            this.handler = handler;
-        }
-
-        @Override
-        public void run() {
-            long beginTime = System.currentTimeMillis();
-            try {
-                TNetworkAddress addr = new TNetworkAddress(be.getHost(), 
be.getBePort());
-                BackendService.Client client = 
ClientPool.backendPool.borrowObject(addr);
-                client.publishTopicInfo(request);
-                LOG.info("publish move action topic to be {} success, time 
cost={} ms",
-                        be.getHost(), (System.currentTimeMillis() - 
beginTime));
-            } catch (Exception e) {
-                LOG.warn("publish move action topic to be {} error happens: , 
time cost={} ms",
-                        be.getHost(), (System.currentTimeMillis() - 
beginTime), e);
-            } finally {
-                handler.onResponse(be);
-            }
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java
index d9298781476..661ea6a45fa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java
@@ -30,8 +30,6 @@ public interface WorkloadAction {
             throws UserException {
         if (WorkloadActionType.CANCEL_QUERY.equals(workloadActionMeta.action)) 
{
             return WorkloadActionCancelQuery.createWorkloadAction();
-        } else if 
(WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(workloadActionMeta.action)) {
-            return 
WorkloadActionMoveQueryToGroup.createWorkloadAction(workloadActionMeta.actionArgs);
         } else if 
(WorkloadActionType.SET_SESSION_VARIABLE.equals(workloadActionMeta.action)) {
             return 
WorkloadActionSetSessionVar.createWorkloadAction(workloadActionMeta.actionArgs);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java
deleted file mode 100644
index 59e09345fa9..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.resource.workloadschedpolicy;
-
-import org.apache.doris.common.publish.WorkloadActionPublishThread;
-import org.apache.doris.qe.QeProcessorImpl;
-import org.apache.doris.thrift.TTopicInfoType;
-import org.apache.doris.thrift.TWorkloadMoveQueryToGroupAction;
-import org.apache.doris.thrift.TopicInfo;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class WorkloadActionMoveQueryToGroup implements WorkloadAction {
-
-    private static final Logger LOG = 
LogManager.getLogger(WorkloadActionMoveQueryToGroup.class);
-
-    private long dstWgId;
-
-    public WorkloadActionMoveQueryToGroup(long dstWgId) {
-        this.dstWgId = dstWgId;
-    }
-
-    @Override
-    public void exec(WorkloadQueryInfo queryInfo) {
-        if (queryInfo.context != null && !queryInfo.context.isKilled()
-                && queryInfo.tUniqueId != null
-                && 
QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
-            LOG.info("try move query {} to group {}", queryInfo.queryId, 
dstWgId);
-
-            TWorkloadMoveQueryToGroupAction moveQueryToGroupAction = new 
TWorkloadMoveQueryToGroupAction();
-            moveQueryToGroupAction.setQueryId(queryInfo.tUniqueId);
-            moveQueryToGroupAction.setWorkloadGroupId(dstWgId);
-
-            TopicInfo topicInfo = new TopicInfo();
-            topicInfo.setMoveAction(moveQueryToGroupAction);
-
-            
WorkloadActionPublishThread.putWorkloadAction(TTopicInfoType.MOVE_QUERY_TO_GROUP,
 topicInfo);
-        }
-    }
-
-    @Override
-    public WorkloadActionType getWorkloadActionType() {
-        return WorkloadActionType.MOVE_QUERY_TO_GROUP;
-    }
-
-    public static WorkloadActionMoveQueryToGroup createWorkloadAction(String 
groupId) {
-        long wgId = Long.parseLong(groupId);
-        return new WorkloadActionMoveQueryToGroup(wgId);
-    }
-
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
index 1f75d81794f..5d89d2afae9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
@@ -33,6 +33,10 @@ public interface WorkloadCondition {
             return WorkloadConditionUsername.createWorkloadCondition(cm.op, 
cm.value);
         } else if (WorkloadMetricType.QUERY_TIME.equals(cm.metricName)) {
             return WorkloadConditionQueryTime.createWorkloadCondition(cm.op, 
cm.value);
+        } else if (WorkloadMetricType.BE_SCAN_ROWS.equals(cm.metricName)) {
+            return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, 
cm.value);
+        } else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) {
+            return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, 
cm.value);
         }
         throw new UserException("invalid metric name:" + cm.metricName);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
similarity index 51%
copy from 
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
index 1f75d81794f..7431f2e0c4f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
@@ -19,22 +19,34 @@ package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.common.UserException;
 
+public class WorkloadConditionBeScanBytes implements WorkloadCondition {
 
-public interface WorkloadCondition {
+    private long value;
 
-    boolean eval(String strValue);
+    private WorkloadConditionOperator op;
 
-    WorkloadMetricType getMetricType();
+    public WorkloadConditionBeScanBytes(WorkloadConditionOperator op, long 
value) {
+        this.op = op;
+        this.value = value;
+    }
+
+    @Override
+    public boolean eval(String strValue) {
+        // currently not support run in fe, so this condition never match
+        return false;
+    }
 
-    // NOTE(wb) currently createPolicyCondition is also used when replay meta, 
it better not contains heavy check
-    static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm)
+    public static WorkloadConditionBeScanBytes 
createWorkloadCondition(WorkloadConditionOperator op, String value)
             throws UserException {
-        if (WorkloadMetricType.USERNAME.equals(cm.metricName)) {
-            return WorkloadConditionUsername.createWorkloadCondition(cm.op, 
cm.value);
-        } else if (WorkloadMetricType.QUERY_TIME.equals(cm.metricName)) {
-            return WorkloadConditionQueryTime.createWorkloadCondition(cm.op, 
cm.value);
+        long longValue = Long.parseLong(value);
+        if (longValue < 0) {
+            throw new UserException("invalid scan bytes value, " + longValue + 
", it requires >= 0");
         }
-        throw new UserException("invalid metric name:" + cm.metricName);
+        return new WorkloadConditionBeScanBytes(op, longValue);
     }
 
+    @Override
+    public WorkloadMetricType getMetricType() {
+        return WorkloadMetricType.BE_SCAN_BYTES;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
similarity index 51%
copy from 
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
index 1f75d81794f..c2fb638e082 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
@@ -19,22 +19,34 @@ package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.common.UserException;
 
+public class WorkloadConditionBeScanRows implements WorkloadCondition {
 
-public interface WorkloadCondition {
+    private long value;
 
-    boolean eval(String strValue);
+    private WorkloadConditionOperator op;
 
-    WorkloadMetricType getMetricType();
+    public WorkloadConditionBeScanRows(WorkloadConditionOperator op, long 
value) {
+        this.op = op;
+        this.value = value;
+    }
+
+    @Override
+    public boolean eval(String strValue) {
+        // currently not support run in fe, so this condition never match
+        return false;
+    }
 
-    // NOTE(wb) currently createPolicyCondition is also used when replay meta, 
it better not contains heavy check
-    static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm)
+    public static WorkloadConditionBeScanRows 
createWorkloadCondition(WorkloadConditionOperator op, String value)
             throws UserException {
-        if (WorkloadMetricType.USERNAME.equals(cm.metricName)) {
-            return WorkloadConditionUsername.createWorkloadCondition(cm.op, 
cm.value);
-        } else if (WorkloadMetricType.QUERY_TIME.equals(cm.metricName)) {
-            return WorkloadConditionQueryTime.createWorkloadCondition(cm.op, 
cm.value);
+        long longValue = Long.parseLong(value);
+        if (longValue < 0) {
+            throw new UserException("invalid scan rows value, " + longValue + 
", it requires >= 0");
         }
-        throw new UserException("invalid metric name:" + cm.metricName);
+        return new WorkloadConditionBeScanRows(op, longValue);
     }
 
+    @Override
+    public WorkloadMetricType getMetricType() {
+        return WorkloadMetricType.BE_SCAN_ROWS;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
index d5d2f922f3f..52f50f924fc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
@@ -44,6 +44,10 @@ public class WorkloadConditionMeta {
             return WorkloadMetricType.USERNAME;
         } else if 
(WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) {
             return WorkloadMetricType.QUERY_TIME;
+        } else if 
(WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) {
+            return WorkloadMetricType.BE_SCAN_ROWS;
+        } else if 
(WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) {
+            return WorkloadMetricType.BE_SCAN_BYTES;
         }
         throw new UserException("invalid metric name:" + metricStr);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
index f81d75c675f..ed17414ec45 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
@@ -18,5 +18,5 @@
 package org.apache.doris.resource.workloadschedpolicy;
 
 public enum WorkloadMetricType {
-    USERNAME, QUERY_TIME, SCAN_ROWS, SCAN_BYTES
+    USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 13637b5b8be..8b028e3e75f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -52,7 +52,9 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
     // used for convert fe type to thrift type
     private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> 
METRIC_MAP
             = new ImmutableMap.Builder<WorkloadMetricType, 
TWorkloadMetricType>()
-            .put(WorkloadMetricType.QUERY_TIME, 
TWorkloadMetricType.QUERY_TIME).build();
+            .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
+            .put(WorkloadMetricType.BE_SCAN_ROWS, 
TWorkloadMetricType.BE_SCAN_ROWS)
+            .put(WorkloadMetricType.BE_SCAN_BYTES, 
TWorkloadMetricType.BE_SCAN_BYTES).build();
     private static ImmutableMap<WorkloadActionType, TWorkloadActionType> 
ACTION_MAP
             = new ImmutableMap.Builder<WorkloadActionType, 
TWorkloadActionType>()
             .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, 
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 0074020735e..5c35d1ee095 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -86,9 +86,8 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
             .add(WorkloadActionType.CANCEL_QUERY).build();
 
     public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
-            = new 
ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.SCAN_ROWS)
-            
.add(WorkloadMetricType.SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
-            .build();
+            = new 
ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
+            
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build();
 
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
@@ -122,15 +121,12 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
                         }
 
                         String username = cctx.getQualifiedUser();
-                        long queryTime = System.currentTimeMillis() - 
cctx.getStartTime();
-
                         WorkloadQueryInfo policyQueryInfo = new 
WorkloadQueryInfo();
                         policyQueryInfo.queryId = cctx.queryId() == null ? 
null : DebugUtil.printId(cctx.queryId());
                         policyQueryInfo.tUniqueId = cctx.queryId();
                         policyQueryInfo.context = cctx;
                         policyQueryInfo.metricMap = new HashMap<>();
                         
policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username);
-                        
policyQueryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, 
String.valueOf(queryTime));
 
                         queryInfoList.add(policyQueryInfo);
                     }
@@ -174,19 +170,7 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
         List<WorkloadActionMeta> originActions = createStmt.getActions();
         List<WorkloadAction> policyActionList = new ArrayList<>();
         for (WorkloadActionMeta workloadActionMeta : originActions) {
-            WorkloadActionType actionName = workloadActionMeta.action;
-            String actionArgs = workloadActionMeta.actionArgs;
-
-            // we need convert wgName to wgId, because wgName may change
-            if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(actionName)) {
-                Long wgId = 
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroupIdByName(actionArgs);
-                if (wgId == null) {
-                    throw new UserException(
-                            "can not find workload group " + actionArgs + " 
when set workload sched policy");
-                }
-                workloadActionMeta.actionArgs = wgId.toString();
-            }
-
+            // todo(wb) support move action
             WorkloadAction ret = 
WorkloadAction.createWorkloadAction(workloadActionMeta);
             policyActionList.add(ret);
         }
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index c59abd65d3b..24edaefc103 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -179,15 +179,10 @@ struct TWorkloadGroupInfo {
   9: optional i32 scan_thread_num
 }
 
-struct TWorkloadMoveQueryToGroupAction {
-    1: optional Types.TUniqueId query_id
-    2: optional i64 workload_group_id
-}
-
 enum TWorkloadMetricType {
     QUERY_TIME
-    SCAN_ROWS
-    SCAN_BYTES
+    BE_SCAN_ROWS
+    BE_SCAN_BYTES
 }
 
 enum TCompareOperator {
@@ -226,8 +221,7 @@ struct TWorkloadSchedPolicy {
 
 struct TopicInfo {
     1: optional TWorkloadGroupInfo workload_group_info
-    2: optional TWorkloadMoveQueryToGroupAction move_action
-    3: optional TWorkloadSchedPolicy workload_sched_policy
+    2: optional TWorkloadSchedPolicy workload_sched_policy
 }
 
 struct TPublishTopicRequest {
diff --git 
a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out 
b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
index 0c86700866d..d32fff321e4 100644
--- a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
+++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
@@ -2,7 +2,6 @@
 -- !select_policy_tvf --
 be_policy      query_time > 10 cancel_query    10      false   0
 fe_policy      username = root set_session_variable "workload_group=normal"    
10      false   0
-move_action_policy     query_time > 10 move_query_to_group "normal"    0       
true    0
 set_action_policy      username = root set_session_variable 
"workload_group=normal"    0       true    0
 test_cancel_policy     query_time > 10 cancel_query    0       false   0
 
diff --git 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
index cabba8ee008..603bbdf520e 100644
--- 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
+++ 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -20,7 +20,6 @@ suite("test_workload_sched_policy") {
     sql "set experimental_enable_nereids_planner = false;"
 
     sql "drop workload schedule policy if exists test_cancel_policy;"
-    sql "drop workload schedule policy if exists move_action_policy;"
     sql "drop workload schedule policy if exists set_action_policy;"
     sql "drop workload schedule policy if exists fe_policy;"
     sql "drop workload schedule policy if exists be_policy;"
@@ -30,17 +29,12 @@ suite("test_workload_sched_policy") {
             " conditions(query_time > 10) " +
             " actions(cancel_query) properties('enabled'='false'); "
 
-    // 2 create cancel policy
-    sql "create workload schedule policy move_action_policy " +
-            "conditions(query_time > 10) " +
-            "actions(move_query_to_group 'normal');"
-
-    // 3 create set policy
+    // 2 create set policy
     sql "create workload schedule policy set_action_policy " +
             "conditions(username='root') " +
             "actions(set_session_variable 'workload_group=normal');"
 
-    // 4 create policy run in fe
+    // 3 create policy run in fe
     sql "create workload schedule policy fe_policy " +
             "conditions(username='root') " +
             "actions(set_session_variable 'workload_group=normal') " +
@@ -49,7 +43,7 @@ suite("test_workload_sched_policy") {
             "'priority'='10' " +
             ");"
 
-    // 5 create policy run in be
+    // 4 create policy run in be
     sql "create workload schedule policy be_policy " +
             "conditions(query_time > 10) " +
             "actions(cancel_query) " +
@@ -96,14 +90,6 @@ suite("test_workload_sched_policy") {
         exception "priority can only between"
     }
 
-    test {
-        sql "create workload schedule policy conflict_policy " +
-                "conditions (query_time > 0)" +
-                "actions(cancel_query, move_query_to_group 'normal');"
-
-        exception "can not exist in one policy at same time"
-    }
-
     test {
         sql "create workload schedule policy conflict_policy " +
                 "conditions (query_time > 0) " +
@@ -122,7 +108,6 @@ suite("test_workload_sched_policy") {
 
     // drop
     sql "drop workload schedule policy test_cancel_policy;"
-    sql "drop workload schedule policy move_action_policy;"
     sql "drop workload schedule policy set_action_policy;"
     sql "drop workload schedule policy fe_policy;"
     sql "drop workload schedule policy be_policy;"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to