This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new a47153b982d branch-4.1: [enhancement](workload policy) Add 
username-based backend workload policy support #60559 (#64493)
a47153b982d is described below

commit a47153b982d348acaeff3604361794ad3b31463c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 16 13:29:14 2026 +0800

    branch-4.1: [enhancement](workload policy) Add username-based backend 
workload policy support #60559 (#64493)
    
    Cherry-picked from #60559
    
    Co-authored-by: Wen Zhenghu <[email protected]>
---
 .../workload_management/query_task_controller.cpp  |  13 ++
 .../workload_management/query_task_controller.h    |   2 +
 .../runtime/workload_management/task_controller.h  |   2 +
 .../workload_management/workload_condition.cpp     |  11 ++
 .../workload_management/workload_condition.h       |  19 ++-
 .../workload_management/workload_sched_policy.cpp  |  10 +-
 .../query_task_controller_test.cpp                 |  64 ++++++++
 .../workload_condition_test.cpp                    |  46 ++++++
 be/test/runtime/workload_sched_policy_test.cpp     | 157 +++++++++++++++++++
 .../nereids/load/NereidsStreamLoadPlanner.java     |   8 +
 .../main/java/org/apache/doris/qe/Coordinator.java |   8 +
 .../doris/qe/runtime/ThriftPlansBuilder.java       |   8 +
 .../WorkloadConditionUsername.java                 |  12 +-
 .../WorkloadSchedPolicyMgr.java                    |  40 +++--
 .../WorkloadSchedPolicyMgrTest.java                | 168 +++++++++++++++++++--
 gensrc/thrift/BackendService.thrift                |   3 +-
 .../test_workload_sched_policy.groovy              |  42 +++++-
 17 files changed, 579 insertions(+), 34 deletions(-)

diff --git a/be/src/runtime/workload_management/query_task_controller.cpp 
b/be/src/runtime/workload_management/query_task_controller.cpp
index 16e950fa56c..de6b3e8a24f 100644
--- a/be/src/runtime/workload_management/query_task_controller.cpp
+++ b/be/src/runtime/workload_management/query_task_controller.cpp
@@ -227,6 +227,19 @@ std::vector<PipelineTask*> 
QueryTaskController::get_revocable_tasks() {
     return tasks;
 }
 
+bool QueryTaskController::get_user(std::string* user) {
+    auto query_ctx = query_ctx_.lock();
+    if (query_ctx == nullptr) {
+        return false;
+    }
+    // Only expose user metadata when it is explicitly attached to the query 
context.
+    if (query_ctx->set_rsc_info) {
+        *user = query_ctx->user;
+        return true;
+    }
+    return false;
+}
+
 void QueryTaskController::add_total_task_num(int delta) {
     _total_task_num.fetch_add(delta, std::memory_order_relaxed);
 }
diff --git a/be/src/runtime/workload_management/query_task_controller.h 
b/be/src/runtime/workload_management/query_task_controller.h
index 0d46196c150..854a2c05cb6 100644
--- a/be/src/runtime/workload_management/query_task_controller.h
+++ b/be/src/runtime/workload_management/query_task_controller.h
@@ -48,6 +48,8 @@ public:
     size_t get_revocable_size() override;
     Status revoke_memory() override;
     std::vector<PipelineTask*> get_revocable_tasks() override;
+    // Distinguish missing user metadata from an empty username.
+    bool get_user(std::string* user) override;
     // Expose task progress counters without leaking full QueryContext.
     void add_total_task_num(int delta);
     void inc_finished_task_num();
diff --git a/be/src/runtime/workload_management/task_controller.h 
b/be/src/runtime/workload_management/task_controller.h
index f3ff37eb338..37f2c6ab5a6 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -48,6 +48,8 @@ public:
     TNetworkAddress fe_addr() { return fe_addr_; }
     void set_fe_addr(TNetworkAddress fe_addr) { fe_addr_ = fe_addr; }
     std::string debug_string();
+    // Distinguish missing user metadata from an empty username.
+    virtual bool get_user(std::string* user) { return false; }
 
     /* finish action
     */
diff --git a/be/src/runtime/workload_management/workload_condition.cpp 
b/be/src/runtime/workload_management/workload_condition.cpp
index 62c6072a60c..4c85a95ad1f 100644
--- a/be/src/runtime/workload_management/workload_condition.cpp
+++ b/be/src/runtime/workload_management/workload_condition.cpp
@@ -69,4 +69,15 @@ bool WorkloadConditionQueryMemory::eval(std::string str_val) 
{
                                                         _query_memory_bytes);
 }
 
+// username
+WorkloadConditionUsername::WorkloadConditionUsername(WorkloadCompareOperator 
op,
+                                                     std::string str_val) {
+    _op = op;
+    _username = str_val;
+}
+
+bool WorkloadConditionUsername::eval(std::string str_val) {
+    return WorkloadCompareUtils::compare_string(_op, str_val, _username);
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_condition.h 
b/be/src/runtime/workload_management/workload_condition.h
index cf53a5f07dd..1a8c9e8dc8b 100644
--- a/be/src/runtime/workload_management/workload_condition.h
+++ b/be/src/runtime/workload_management/workload_condition.h
@@ -23,7 +23,7 @@
 
 namespace doris {
 
-enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, 
QUERY_MEMORY_BYTES };
+enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, 
QUERY_MEMORY_BYTES, USERNAME };
 
 class WorkloadCondition {
 public:
@@ -107,6 +107,21 @@ private:
     WorkloadCompareOperator _op;
 };
 
+class WorkloadConditionUsername : public WorkloadCondition {
+public:
+    WorkloadConditionUsername(WorkloadCompareOperator op, std::string str_val);
+    bool eval(std::string str_val) override;
+    WorkloadMetricType get_workload_metric_type() override { return 
WorkloadMetricType::USERNAME; }
+
+    std::string get_metric_string() override { return "username"; }
+
+    std::string get_metric_value_string() override { return _username; }
+
+private:
+    std::string _username;
+    WorkloadCompareOperator _op;
+};
+
 class WorkloadConditionFactory {
 public:
     static std::unique_ptr<WorkloadCondition> create_workload_condition(
@@ -123,6 +138,8 @@ public:
             return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
         } else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == 
metric_name) {
             return std::make_unique<WorkloadConditionQueryMemory>(op, str_val);
+        } else if (TWorkloadMetricType::type::USERNAME == metric_name) {
+            return std::make_unique<WorkloadConditionUsername>(op, str_val);
         }
         LOG(ERROR) << "not find a metric name " << metric_name;
         return nullptr;
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp 
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index 91d75e2aae1..e439c873ad2 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -18,6 +18,7 @@
 #include "runtime/workload_management/workload_sched_policy.h"
 
 #include "runtime/workload_management/resource_context.h"
+#include "runtime/workload_management/task_controller.h"
 #include "util/time.h"
 
 namespace doris {
@@ -88,6 +89,13 @@ bool 
WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtim
                     
action_runtime_ctx->resource_ctx->memory_context()->current_memory_bytes());
             break;
         }
+        case WorkloadMetricType::USERNAME: {
+            // Reject the condition when BE does not have explicit user 
metadata.
+            if 
(!action_runtime_ctx->resource_ctx->task_controller()->get_user(&val)) {
+                return false;
+            }
+            break;
+        }
         default:
             return false;
         }
@@ -111,4 +119,4 @@ void 
WorkloadSchedPolicy::exec_action(WorkloadAction::RuntimeContext* action_run
     }
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/runtime/workload_management/query_task_controller_test.cpp 
b/be/test/runtime/workload_management/query_task_controller_test.cpp
new file mode 100644
index 00000000000..82f4cba8ba0
--- /dev/null
+++ b/be/test/runtime/workload_management/query_task_controller_test.cpp
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/query_task_controller.h"
+
+#include <gtest/gtest.h>
+
+#include "testutil/mock/mock_query_context.h"
+
+namespace doris {
+
+class QueryTaskControllerTest : public testing::Test {
+public:
+    QueryTaskControllerTest() = default;
+    ~QueryTaskControllerTest() override = default;
+};
+
+TEST_F(QueryTaskControllerTest, TestGetUser) {
+    // 1. Create MockQueryContext
+    auto query_ctx = MockQueryContext::create();
+
+    // 2. Create QueryTaskController
+    auto task_controller = QueryTaskController::create(query_ctx);
+    std::string user;
+
+    // 3. Test default (set_rsc_info = false)
+    EXPECT_FALSE(task_controller->get_user(&user));
+
+    // 4. Test with user set but set_rsc_info = false
+    query_ctx->user = "test_user";
+    EXPECT_FALSE(task_controller->get_user(&user));
+
+    // 5. Test with set_rsc_info = true
+    query_ctx->set_rsc_info = true;
+    EXPECT_TRUE(task_controller->get_user(&user));
+    EXPECT_EQ("test_user", user);
+
+    // 6. Test when QueryContext is destroyed
+    std::shared_ptr<QueryContext> ctx_ptr = MockQueryContext::create();
+    auto controller_ptr = QueryTaskController::create(ctx_ptr);
+    ctx_ptr->set_rsc_info = true;
+    ctx_ptr->user = "user1";
+    EXPECT_TRUE(controller_ptr->get_user(&user));
+    EXPECT_EQ("user1", user);
+
+    ctx_ptr.reset(); // Destroy the context
+    EXPECT_FALSE(controller_ptr->get_user(&user));
+}
+
+} // namespace doris
diff --git a/be/test/runtime/workload_management/workload_condition_test.cpp 
b/be/test/runtime/workload_management/workload_condition_test.cpp
new file mode 100644
index 00000000000..305496d2755
--- /dev/null
+++ b/be/test/runtime/workload_management/workload_condition_test.cpp
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_condition.h"
+
+#include <gtest/gtest.h>
+
+namespace doris {
+
+class WorkloadConditionTest : public testing::Test {
+public:
+    WorkloadConditionTest() = default;
+    ~WorkloadConditionTest() override = default;
+};
+
+TEST_F(WorkloadConditionTest, TestUsernameCondition) {
+    // 1. Equal
+    {
+        WorkloadConditionUsername cond(WorkloadCompareOperator::EQUAL, 
"test_user");
+        EXPECT_TRUE(cond.eval("test_user"));
+        EXPECT_FALSE(cond.eval("other_user"));
+        EXPECT_EQ(WorkloadMetricType::USERNAME, 
cond.get_workload_metric_type());
+    }
+
+    // 2. Unsupported Operator (GREATER)
+    {
+        WorkloadConditionUsername cond(WorkloadCompareOperator::GREATER, 
"a_user");
+        EXPECT_FALSE(cond.eval("b_user")); // Not supported, returns false
+    }
+}
+
+} // namespace doris
diff --git a/be/test/runtime/workload_sched_policy_test.cpp 
b/be/test/runtime/workload_sched_policy_test.cpp
index 3f30c08d8c3..719ce0f04ad 100644
--- a/be/test/runtime/workload_sched_policy_test.cpp
+++ b/be/test/runtime/workload_sched_policy_test.cpp
@@ -60,6 +60,45 @@ protected:
     void SetUp() override {}
 };
 
+class MockTaskController : public TaskController {
+public:
+    MockTaskController() = default;
+    ~MockTaskController() override = default;
+
+    // Simulate BE task controllers that may or may not carry user metadata.
+    bool get_user(std::string* user) override {
+        if (!_has_user) {
+            return false;
+        }
+        *user = _user;
+        return true;
+    }
+    void set_user(std::string user) {
+        _user = std::move(user);
+        _has_user = true;
+    }
+    void clear_user() {
+        _user.clear();
+        _has_user = false;
+    }
+
+private:
+    bool _has_user = false;
+    std::string _user;
+};
+
+class MockWorkloadAction : public WorkloadAction {
+public:
+    MockWorkloadAction(WorkloadActionType type) : _type(type) {}
+    void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) override { 
_exec_count++; }
+    WorkloadActionType get_action_type() override { return _type; }
+    int get_exec_count() { return _exec_count; }
+
+private:
+    WorkloadActionType _type;
+    int _exec_count = 0;
+};
+
 TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) {
     // 1 empty resource
     {
@@ -324,4 +363,122 @@ TEST_F(WorkloadSchedPolicyTest, 
test_task_controller_running_time) {
     EXPECT_EQ(task_controller->running_time(), finished_running_time);
 }
 
+TEST_F(WorkloadSchedPolicyTest, one_policy_username_condition) {
+    std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+    std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+    
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::USERNAME,
+                                                      
TCompareOperator::type::EQUAL, "admin"));
+    std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+    
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+    std::set<int64_t> wg_id_set;
+    policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                 std::move(action_ptr_list));
+
+    WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+    std::unique_ptr<MockTaskController> task_controller = 
std::make_unique<MockTaskController>();
+    task_controller->set_user("root");
+    
action_runtime_ctx.resource_ctx->set_task_controller(std::move(task_controller));
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+
+    
static_cast<MockTaskController*>(action_runtime_ctx.resource_ctx->task_controller())
+            ->set_user("admin");
+    EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+
+    // Missing user metadata must not be treated as an empty username match.
+    
static_cast<MockTaskController*>(action_runtime_ctx.resource_ctx->task_controller())
+            ->clear_user();
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+
+    // Test INVALID operator
+    {
+        std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+        std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+        cond_ptr_list.push_back(create_workload_condition(
+                TWorkloadMetricType::type::USERNAME, 
TCompareOperator::type::GREATER, "admin"));
+        std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+        std::set<int64_t> wg_id_set;
+        policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                     std::move(action_ptr_list));
+
+        WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+        std::unique_ptr<MockTaskController> task_controller =
+                std::make_unique<MockTaskController>();
+        task_controller->set_user("admin");
+        
action_runtime_ctx.resource_ctx->set_task_controller(std::move(task_controller));
+        EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+    }
+}
+
+TEST_F(WorkloadSchedPolicyTest, policy_mixed_conditions) {
+    std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+    std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+    
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::USERNAME,
+                                                      
TCompareOperator::type::EQUAL, "admin"));
+    
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
+                                                      
TCompareOperator::type::GREATER, "10"));
+    std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+    
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+    std::set<int64_t> wg_id_set;
+    policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                 std::move(action_ptr_list));
+
+    WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+    std::unique_ptr<MockTaskController> task_controller = 
std::make_unique<MockTaskController>();
+    task_controller->set_user("admin");
+    TUniqueId task_id;
+    task_id.hi = 1;
+    task_id.lo = 1;
+    task_controller->set_task_id(task_id);
+    
action_runtime_ctx.resource_ctx->set_task_controller(std::move(task_controller));
+
+    // 1. Username matches, but query time too short
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+
+    // 2. Username matches, query time long enough
+    std::this_thread::sleep_for(std::chrono::milliseconds(20));
+    action_runtime_ctx.resource_ctx->task_controller()->finish();
+    EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+
+    // 3. Username mismatch, query time long enough
+    
static_cast<MockTaskController*>(action_runtime_ctx.resource_ctx->task_controller())
+            ->set_user("root");
+    EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+}
+
+TEST_F(WorkloadSchedPolicyTest, policy_action_execution) {
+    std::shared_ptr<WorkloadSchedPolicy> policy = 
std::make_shared<WorkloadSchedPolicy>();
+    std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+    
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::USERNAME,
+                                                      
TCompareOperator::type::EQUAL, "admin"));
+
+    std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+    auto mock_action = 
std::make_unique<MockWorkloadAction>(WorkloadActionType::CANCEL_QUERY);
+    MockWorkloadAction* mock_action_ptr = mock_action.get();
+    action_ptr_list.push_back(std::move(mock_action));
+
+    std::set<int64_t> wg_id_set;
+    policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+                 std::move(action_ptr_list));
+
+    WorkloadAction::RuntimeContext action_runtime_ctx = 
create_runtime_context();
+    std::unique_ptr<MockTaskController> task_controller = 
std::make_unique<MockTaskController>();
+    task_controller->set_user("admin");
+    
action_runtime_ctx.resource_ctx->set_task_controller(std::move(task_controller));
+
+    EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+    policy->exec_action(&action_runtime_ctx);
+    EXPECT_EQ(mock_action_ptr->get_exec_count(), 1);
+}
+
+TEST_F(WorkloadSchedPolicyTest, invalid_condition_creation) {
+    // Test creating condition with invalid metric type (using a value outside 
enum range if possible, or mocked TWorkloadCondition)
+    TWorkloadCondition cond;
+    cond.metric_name = static_cast<TWorkloadMetricType::type>(999); // Invalid 
type
+    cond.op = TCompareOperator::type::EQUAL;
+    cond.value = "test";
+
+    auto result = WorkloadConditionFactory::create_workload_condition(&cond);
+    EXPECT_EQ(result, nullptr);
+}
+
 } // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index a1c912eede8..ad8fa4475e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -52,6 +52,7 @@ import org.apache.doris.thrift.TPipelineInstanceParams;
 import org.apache.doris.thrift.TQueryGlobals;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryType;
+import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TUniqueId;
@@ -277,6 +278,13 @@ public class NereidsStreamLoadPlanner {
         params.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
         params.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
 
+        if (ConnectContext.get() != null && 
ConnectContext.get().getCurrentUserIdentity() != null) {
+            TResourceInfo resourceInfo = new TResourceInfo();
+            
resourceInfo.setUser(ConnectContext.get().getCurrentUserIdentity().getQualifiedUser());
+            resourceInfo.setGroup("");
+            params.setResourceInfo(resourceInfo);
+        }
+
         TPipelineInstanceParams execParams = new TPipelineInstanceParams();
         // user load id (streamLoadTask.id) as query id
         params.setQueryId(loadId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 16bbc6a3397..f55581076a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -115,6 +115,7 @@ import org.apache.doris.thrift.TQueryGlobals;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReportExecStatusParams;
+import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TResourceLimit;
 import org.apache.doris.thrift.TRuntimeFilterParams;
 import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
@@ -3291,6 +3292,13 @@ public class Coordinator implements CoordInterface {
                         params.setWorkloadGroups(tWorkloadGroups);
                     }
 
+                    if (context != null && context.getCurrentUserIdentity() != 
null) {
+                        TResourceInfo resourceInfo = new TResourceInfo();
+                        
resourceInfo.setUser(context.getCurrentUserIdentity().getQualifiedUser());
+                        resourceInfo.setGroup("");
+                        params.setResourceInfo(resourceInfo);
+                    }
+
                     params.setFileScanParams(fileScanRangeParamsMap);
                     params.setNumBuckets(fragment.getBucketNum());
                     params.setTotalInstances(instanceExecParams.size());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 107d615fb89..2ef71ab66dc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -65,6 +65,7 @@ import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TRecCTENode;
 import org.apache.doris.thrift.TRecCTEResetInfo;
 import org.apache.doris.thrift.TRecCTETarget;
+import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TRuntimeFilterInfo;
 import org.apache.doris.thrift.TRuntimeFilterParams;
 import org.apache.doris.thrift.TScanRangeParams;
@@ -404,6 +405,13 @@ public class ThriftPlansBuilder {
             params.setLocalParams(Lists.newArrayList());
             params.setWorkloadGroups(coordinatorContext.getWorkloadGroups());
 
+            if (connectContext != null && 
connectContext.getCurrentUserIdentity() != null) {
+                TResourceInfo resourceInfo = new TResourceInfo();
+                
resourceInfo.setUser(connectContext.getCurrentUserIdentity().getQualifiedUser());
+                resourceInfo.setGroup("");
+                params.setResourceInfo(resourceInfo);
+            }
+
             params.setFileScanParams(fileScanRangeParamsMap);
 
             if (fragmentPlan.getFragmentJob() instanceof 
UnassignedScanBucketOlapTableJob) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java
index 90431a0141b..013675e4510 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.resource.workloadschedpolicy;
 
+import org.apache.doris.common.UserException;
+
 import com.google.gson.annotations.SerializedName;
 
 public class WorkloadConditionUsername implements WorkloadCondition {
@@ -41,7 +43,15 @@ public class WorkloadConditionUsername implements 
WorkloadCondition {
         return WorkloadMetricType.USERNAME;
     }
 
-    public static WorkloadConditionUsername 
createWorkloadCondition(WorkloadConditionOperator op, String value) {
+    public static WorkloadConditionUsername 
createWorkloadCondition(WorkloadConditionOperator op, String value)
+            throws UserException {
+        if (op != WorkloadConditionOperator.EQUAL) {
+            throw new UserException("username only support EQUAL operator");
+        }
+        // Reject blank usernames so BE-side missing user metadata cannot be 
matched by mistake.
+        if (value == null || value.trim().isEmpty()) {
+            throw new UserException("username can not be empty");
+        }
         // todo(wb) check whether input username is valid
         return new WorkloadConditionUsername(op, value);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 2d3cd486f58..3442c8a7789 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -106,7 +106,7 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon 
implements Writable, Gs
     public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
             = new 
ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
             
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
-            .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
+            
.add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).add(WorkloadMetricType.USERNAME).build();
 
     // used for convert fe type to thrift type
     public static final ImmutableMap<WorkloadMetricType, TWorkloadMetricType> 
METRIC_MAP
@@ -114,7 +114,8 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon 
implements Writable, Gs
             .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
             .put(WorkloadMetricType.BE_SCAN_ROWS, 
TWorkloadMetricType.BE_SCAN_ROWS)
             .put(WorkloadMetricType.BE_SCAN_BYTES, 
TWorkloadMetricType.BE_SCAN_BYTES)
-            .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, 
TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
+            .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, 
TWorkloadMetricType.QUERY_BE_MEMORY_BYTES)
+            .put(WorkloadMetricType.USERNAME, 
TWorkloadMetricType.USERNAME).build();
     public static final ImmutableMap<WorkloadActionType, TWorkloadActionType> 
ACTION_MAP
             = new ImmutableMap.Builder<WorkloadActionType, 
TWorkloadActionType>()
             .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, 
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
@@ -196,7 +197,7 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon 
implements Writable, Gs
             WorkloadCondition cond = 
WorkloadCondition.createWorkloadCondition(cm);
             policyConditionList.add(cond);
         }
-        boolean feCondition = checkPolicyCondition(policyConditionList);
+        Boolean feCondition = checkPolicyCondition(policyConditionList);
 
         // 2 create action
         List<WorkloadAction> policyActionList = new ArrayList<>();
@@ -207,7 +208,7 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon 
implements Writable, Gs
         }
 
         boolean feAction = checkPolicyAction(policyActionList);
-        if (feAction != feCondition) {
+        if (feCondition != null && feAction != feCondition) {
             throw new UserException("action and metric must run in FE together 
or run in BE together");
         }
 
@@ -245,27 +246,36 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon 
implements Writable, Gs
         }
     }
 
-    private boolean checkPolicyCondition(List<WorkloadCondition> 
conditionList) throws UserException {
+    private Boolean checkPolicyCondition(List<WorkloadCondition> 
conditionList) throws UserException {
         if (conditionList.size() > 
Config.workload_max_condition_num_in_policy) {
             throw new UserException(
                     "condition num in a policy can not exceed " + 
Config.workload_max_condition_num_in_policy);
         }
-        boolean containsFeMetric = false;
-        boolean containsBeMetric = false;
+        boolean hasFeOnlyMetric = false;
+        boolean hasBeOnlyMetric = false;
         for (WorkloadCondition cond : conditionList) {
-            if (FE_METRIC_SET.contains(cond.getMetricType())) {
-                containsFeMetric = true;
-            }
-            if (BE_METRIC_SET.contains(cond.getMetricType())) {
-                containsBeMetric = true;
+            boolean isFe = FE_METRIC_SET.contains(cond.getMetricType());
+            boolean isBe = BE_METRIC_SET.contains(cond.getMetricType());
+
+            if (isFe && !isBe) {
+                hasFeOnlyMetric = true;
+            } else if (isBe && !isFe) {
+                hasBeOnlyMetric = true;
             }
-            if (containsFeMetric && containsBeMetric) {
+
+            if (hasFeOnlyMetric && hasBeOnlyMetric) {
                 throw new UserException(
-                        "one policy can not contains fe and be metric, FE 
metric list is " + FE_METRIC_SET
+                        "one policy can not contains fe only and be only 
metric, FE metric list is " + FE_METRIC_SET
                                 + ", BE metric list is " + BE_METRIC_SET);
             }
         }
-        return containsFeMetric;
+        if (hasFeOnlyMetric) {
+            return true;
+        } else if (hasBeOnlyMetric) {
+            return false;
+        } else {
+            return null;
+        }
     }
 
     private boolean checkPolicyAction(List<WorkloadAction> actionList) throws 
UserException {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java
index 615fc0bc2eb..7461cba03d5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java
@@ -17,13 +17,17 @@
 
 package org.apache.doris.resource.workloadschedpolicy;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
+import org.apache.doris.persist.EditLog;
 
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,23 +35,13 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Unit tests for the workload_group property format enforced by
- * {@link WorkloadSchedPolicyMgr#checkProperties(Map, List)}.
- *
- * The contract:
- *
- *   - Cloud mode    : workload_group must be 
'<compute_group>.<workload_group>'.
- *   - Non-cloud mode: workload_group may be '<workload_group>' (defaulting the
- *                     resource group to Tag.VALUE_DEFAULT_TAG) or the
- *                     '<resource_group>.<workload_group>' form — the dotted
- *                     prefix is a resource group (Tag) here, sharing the 
cloud-mode
- *                     grammar purely for consistency.
- *
- * Invalid forms must be rejected BEFORE any compute-group lookup, so the
- * rejection is exercisable here without bootstrapping the full Env.
+ * Unit tests for workload schedule policy validation paths.
  */
 public class WorkloadSchedPolicyMgrTest {
 
+    private Env env;
+    private EditLog editLog;
+    private MockedStatic<Env> mockedEnv;
     private String originDeployMode;
     private String originCloudUniqueId;
     private WorkloadSchedPolicyMgr mgr;
@@ -57,12 +51,21 @@ public class WorkloadSchedPolicyMgrTest {
         originDeployMode = Config.deploy_mode;
         originCloudUniqueId = Config.cloud_unique_id;
         mgr = new WorkloadSchedPolicyMgr();
+        env = Mockito.mock(Env.class);
+        editLog = Mockito.mock(EditLog.class);
+        mockedEnv = Mockito.mockStatic(Env.class);
+
+        mockedEnv.when(Env::getCurrentEnv).thenReturn(env);
+        Mockito.when(env.getEditLog()).thenReturn(editLog);
     }
 
     @After
     public void tearDown() {
         Config.deploy_mode = originDeployMode;
         Config.cloud_unique_id = originCloudUniqueId;
+        if (mockedEnv != null) {
+            mockedEnv.close();
+        }
     }
 
     private Map<String, String> propsWith(String workloadGroupValue) {
@@ -71,6 +74,143 @@ public class WorkloadSchedPolicyMgrTest {
         return p;
     }
 
+    @Test
+    public void testCheckPolicyCondition() {
+        // Case 1: USERNAME (Shared) + BE Metric + BE Action -> OK
+        try {
+            List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+            conditionMetas.add(new WorkloadConditionMeta("username", "=", 
"user1"));
+            conditionMetas.add(new WorkloadConditionMeta("be_scan_rows", ">", 
"1000"));
+
+            List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+            actionMetas.add(new WorkloadActionMeta("cancel_query", ""));
+
+            mgr.createWorkloadSchedPolicy("policy_mixed_be", false, 
conditionMetas, actionMetas, null);
+        } catch (UserException e) {
+            Assert.fail("Should not throw exception for mixed USERNAME and BE 
metrics: " + e.getMessage());
+        }
+
+        // Case 2: USERNAME (Shared) + FE Action -> OK
+        try {
+            List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+            conditionMetas.add(new WorkloadConditionMeta("username", "=", 
"user1"));
+
+            List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+            actionMetas.add(new WorkloadActionMeta("set_session_variable", 
"workload_group=normal"));
+
+            mgr.createWorkloadSchedPolicy("policy_fe_only", false, 
conditionMetas, actionMetas, null);
+        } catch (UserException e) {
+            Assert.fail("Should not throw exception for USERNAME + FE Action: 
" + e.getMessage());
+        }
+
+        // Case 3: USERNAME (Shared) + BE Action -> OK
+        try {
+            List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+            conditionMetas.add(new WorkloadConditionMeta("username", "=", 
"user1"));
+
+            List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+            actionMetas.add(new WorkloadActionMeta("cancel_query", ""));
+
+            mgr.createWorkloadSchedPolicy("policy_username_be_action", false, 
conditionMetas, actionMetas, null);
+        } catch (UserException e) {
+            Assert.fail("Should not throw exception for USERNAME + BE Action: 
" + e.getMessage());
+        }
+
+        // Case 4: BE Metric + FE Action -> Error
+        try {
+            List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+            conditionMetas.add(new WorkloadConditionMeta("query_time", ">", 
"1000"));
+
+            List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+            actionMetas.add(new WorkloadActionMeta("set_session_variable", 
"workload_group=normal"));
+
+            mgr.createWorkloadSchedPolicy("policy_error_1", false, 
conditionMetas, actionMetas, null);
+            Assert.fail("Should throw exception for BE Metric + FE Action");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("action and metric must 
run in FE together or run in BE together"));
+        }
+
+        // Case 5: USERNAME + BE Metric + FE Action -> Error
+        try {
+            List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+            conditionMetas.add(new WorkloadConditionMeta("username", "=", 
"user1"));
+            conditionMetas.add(new WorkloadConditionMeta("query_time", ">", 
"1000"));
+
+            List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+            actionMetas.add(new WorkloadActionMeta("set_session_variable", 
"workload_group=normal"));
+
+            mgr.createWorkloadSchedPolicy("policy_error_2", false, 
conditionMetas, actionMetas, null);
+            Assert.fail("Should throw exception for USERNAME + BE Metric + FE 
Action");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("action and metric must 
run in FE together or run in BE together"));
+        }
+    }
+
+    @Test
+    public void testCheckProperties() throws UserException {
+        List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+        conditionMetas.add(new WorkloadConditionMeta("username", "=", 
"user1"));
+        List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+        actionMetas.add(new WorkloadActionMeta("cancel_query", ""));
+
+        // Test valid priority.
+        try {
+            Map<String, String> props = new HashMap<>();
+            props.put("priority", "10");
+            props.put("enabled", "true");
+            mgr.createWorkloadSchedPolicy("policy_prop_valid", false, 
conditionMetas, actionMetas, props);
+        } catch (UserException e) {
+            Assert.fail("Should not throw exception for valid properties: " + 
e.getMessage());
+        }
+
+        // Test invalid priority.
+        try {
+            Map<String, String> props = new HashMap<>();
+            props.put("priority", "101");
+            mgr.createWorkloadSchedPolicy("policy_prop_invalid_prio", false, 
conditionMetas, actionMetas, props);
+            Assert.fail("Should throw exception for invalid priority");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("policy's priority can 
only between 0 ~ 100"));
+        }
+
+        // Test invalid enabled.
+        try {
+            Map<String, String> props = new HashMap<>();
+            props.put("enabled", "yes");
+            mgr.createWorkloadSchedPolicy("policy_prop_invalid_enabled", 
false, conditionMetas, actionMetas, props);
+            Assert.fail("Should throw exception for invalid enabled");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("invalid enabled 
property value"));
+        }
+    }
+
+    @Test
+    public void testUsernameConditionRejectsBlankValue() throws UserException {
+        List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+        List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+        actionMetas.add(new WorkloadActionMeta("cancel_query", ""));
+
+        // Reject an explicit empty username to avoid matching queries without 
user metadata.
+        try {
+            conditionMetas.add(new WorkloadConditionMeta("username", "=", ""));
+            mgr.createWorkloadSchedPolicy("policy_empty_username", false, 
conditionMetas, actionMetas, null);
+            Assert.fail("Should throw exception for empty username");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("username can not be 
empty"));
+        }
+
+        conditionMetas.clear();
+
+        // Reject a blank username for the same reason.
+        try {
+            conditionMetas.add(new WorkloadConditionMeta("username", "=", "   
"));
+            mgr.createWorkloadSchedPolicy("policy_blank_username", false, 
conditionMetas, actionMetas, null);
+            Assert.fail("Should throw exception for blank username");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("username can not be 
empty"));
+        }
+    }
+
     @Test
     public void testCloudModeRejectsUnqualifiedWorkloadGroup() {
         Config.cloud_unique_id = "ut_cloud";
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 44c1e7cd94c..cab711b7831 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -295,7 +295,8 @@ enum TWorkloadMetricType {
     QUERY_TIME = 0,
     BE_SCAN_ROWS = 1,
     BE_SCAN_BYTES = 2,
-    QUERY_BE_MEMORY_BYTES = 3
+    QUERY_BE_MEMORY_BYTES = 3,
+    USERNAME = 4
 }
 
 enum TCompareOperator {
diff --git 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
index 3ba468b79bc..b205ce9dda0 100644
--- 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
+++ 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -247,4 +247,44 @@ suite("test_workload_sched_policy") {
 
     sql "drop user test_alter_policy_user"
     sql "drop workload policy test_alter_policy"
-}
\ No newline at end of file
+
+    // 
============================================================================
+    // Test policy creation with username (shared metric) and query_time (BE 
metric)
+    // 
============================================================================
+
+    // 1. Create a user
+    sql "DROP USER IF EXISTS 'test_policy_user_be'"
+    sql "CREATE USER 'test_policy_user_be'@'%' IDENTIFIED BY '12345'"
+    sql "GRANT SELECT_PRIV ON *.* TO 'test_policy_user_be'@'%'"
+
+    // 2. Create a workload group
+    sql "DROP WORKLOAD GROUP IF EXISTS policy_group_be $forComputeGroupStr"
+    sql "CREATE WORKLOAD GROUP policy_group_be $forComputeGroupStr PROPERTIES 
('max_cpu_percent'='100')"
+    sql "GRANT USAGE_PRIV ON WORKLOAD GROUP 'policy_group_be' TO 
'test_policy_user_be'@'%'"
+
+    // 3. Create a policy with both username (shared metric) and query_time 
(BE metric)
+    sql "DROP WORKLOAD POLICY IF EXISTS test_mixed_policy"
+
+    sql """
+        CREATE WORKLOAD POLICY test_mixed_policy
+        CONDITIONS(username='test_policy_user_be', query_time > 1000)
+        ACTIONS(cancel_query)
+        PROPERTIES('workload_group'='${currentCgName}policy_group_be')
+    """
+
+    // 4. Verify policy creation
+    def policy = sql "SELECT * FROM information_schema.workload_policy WHERE 
name='test_mixed_policy'"
+    assertTrue(policy.size() > 0, "Policy should be created successfully")
+
+    // 5. Smoke check the test user can run in the workload group. This is not 
intended to trigger
+    //    cancel_query; the policy creation above is the behavior covered by 
this case.
+    connect('test_policy_user_be', '12345', context.config.jdbcUrl) {
+        sql "set workload_group = 'policy_group_be'"
+        sql "SELECT 1"
+    }
+
+    // Cleanup
+    sql "DROP WORKLOAD POLICY IF EXISTS test_mixed_policy"
+    sql "DROP WORKLOAD GROUP IF EXISTS policy_group_be $forComputeGroupStr"
+    sql "DROP USER IF EXISTS 'test_policy_user_be'"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to