This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new a47153b982d branch-4.1: [enhancement](workload policy) Add
username-based backend workload policy support #60559 (#64493)
a47153b982d is described below
commit a47153b982d348acaeff3604361794ad3b31463c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 16 13:29:14 2026 +0800
branch-4.1: [enhancement](workload policy) Add username-based backend
workload policy support #60559 (#64493)
Cherry-picked from #60559
Co-authored-by: Wen Zhenghu <[email protected]>
---
.../workload_management/query_task_controller.cpp | 13 ++
.../workload_management/query_task_controller.h | 2 +
.../runtime/workload_management/task_controller.h | 2 +
.../workload_management/workload_condition.cpp | 11 ++
.../workload_management/workload_condition.h | 19 ++-
.../workload_management/workload_sched_policy.cpp | 10 +-
.../query_task_controller_test.cpp | 64 ++++++++
.../workload_condition_test.cpp | 46 ++++++
be/test/runtime/workload_sched_policy_test.cpp | 157 +++++++++++++++++++
.../nereids/load/NereidsStreamLoadPlanner.java | 8 +
.../main/java/org/apache/doris/qe/Coordinator.java | 8 +
.../doris/qe/runtime/ThriftPlansBuilder.java | 8 +
.../WorkloadConditionUsername.java | 12 +-
.../WorkloadSchedPolicyMgr.java | 40 +++--
.../WorkloadSchedPolicyMgrTest.java | 168 +++++++++++++++++++--
gensrc/thrift/BackendService.thrift | 3 +-
.../test_workload_sched_policy.groovy | 42 +++++-
17 files changed, 579 insertions(+), 34 deletions(-)
diff --git a/be/src/runtime/workload_management/query_task_controller.cpp
b/be/src/runtime/workload_management/query_task_controller.cpp
index 16e950fa56c..de6b3e8a24f 100644
--- a/be/src/runtime/workload_management/query_task_controller.cpp
+++ b/be/src/runtime/workload_management/query_task_controller.cpp
@@ -227,6 +227,19 @@ std::vector<PipelineTask*>
QueryTaskController::get_revocable_tasks() {
return tasks;
}
+bool QueryTaskController::get_user(std::string* user) {
+ auto query_ctx = query_ctx_.lock();
+ if (query_ctx == nullptr) {
+ return false;
+ }
+ // Only expose user metadata when it is explicitly attached to the query
context.
+ if (query_ctx->set_rsc_info) {
+ *user = query_ctx->user;
+ return true;
+ }
+ return false;
+}
+
void QueryTaskController::add_total_task_num(int delta) {
_total_task_num.fetch_add(delta, std::memory_order_relaxed);
}
diff --git a/be/src/runtime/workload_management/query_task_controller.h
b/be/src/runtime/workload_management/query_task_controller.h
index 0d46196c150..854a2c05cb6 100644
--- a/be/src/runtime/workload_management/query_task_controller.h
+++ b/be/src/runtime/workload_management/query_task_controller.h
@@ -48,6 +48,8 @@ public:
size_t get_revocable_size() override;
Status revoke_memory() override;
std::vector<PipelineTask*> get_revocable_tasks() override;
+ // Distinguish missing user metadata from an empty username.
+ bool get_user(std::string* user) override;
// Expose task progress counters without leaking full QueryContext.
void add_total_task_num(int delta);
void inc_finished_task_num();
diff --git a/be/src/runtime/workload_management/task_controller.h
b/be/src/runtime/workload_management/task_controller.h
index f3ff37eb338..37f2c6ab5a6 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -48,6 +48,8 @@ public:
TNetworkAddress fe_addr() { return fe_addr_; }
void set_fe_addr(TNetworkAddress fe_addr) { fe_addr_ = fe_addr; }
std::string debug_string();
+ // Distinguish missing user metadata from an empty username.
+ virtual bool get_user(std::string* user) { return false; }
/* finish action
*/
diff --git a/be/src/runtime/workload_management/workload_condition.cpp
b/be/src/runtime/workload_management/workload_condition.cpp
index 62c6072a60c..4c85a95ad1f 100644
--- a/be/src/runtime/workload_management/workload_condition.cpp
+++ b/be/src/runtime/workload_management/workload_condition.cpp
@@ -69,4 +69,15 @@ bool WorkloadConditionQueryMemory::eval(std::string str_val)
{
_query_memory_bytes);
}
+// username
+WorkloadConditionUsername::WorkloadConditionUsername(WorkloadCompareOperator
op,
+ std::string str_val) {
+ _op = op;
+ _username = str_val;
+}
+
+bool WorkloadConditionUsername::eval(std::string str_val) {
+ return WorkloadCompareUtils::compare_string(_op, str_val, _username);
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_condition.h
b/be/src/runtime/workload_management/workload_condition.h
index cf53a5f07dd..1a8c9e8dc8b 100644
--- a/be/src/runtime/workload_management/workload_condition.h
+++ b/be/src/runtime/workload_management/workload_condition.h
@@ -23,7 +23,7 @@
namespace doris {
-enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES,
QUERY_MEMORY_BYTES };
+enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES,
QUERY_MEMORY_BYTES, USERNAME };
class WorkloadCondition {
public:
@@ -107,6 +107,21 @@ private:
WorkloadCompareOperator _op;
};
+class WorkloadConditionUsername : public WorkloadCondition {
+public:
+ WorkloadConditionUsername(WorkloadCompareOperator op, std::string str_val);
+ bool eval(std::string str_val) override;
+ WorkloadMetricType get_workload_metric_type() override { return
WorkloadMetricType::USERNAME; }
+
+ std::string get_metric_string() override { return "username"; }
+
+ std::string get_metric_value_string() override { return _username; }
+
+private:
+ std::string _username;
+ WorkloadCompareOperator _op;
+};
+
class WorkloadConditionFactory {
public:
static std::unique_ptr<WorkloadCondition> create_workload_condition(
@@ -123,6 +138,8 @@ public:
return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
} else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES ==
metric_name) {
return std::make_unique<WorkloadConditionQueryMemory>(op, str_val);
+ } else if (TWorkloadMetricType::type::USERNAME == metric_name) {
+ return std::make_unique<WorkloadConditionUsername>(op, str_val);
}
LOG(ERROR) << "not find a metric name " << metric_name;
return nullptr;
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index 91d75e2aae1..e439c873ad2 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -18,6 +18,7 @@
#include "runtime/workload_management/workload_sched_policy.h"
#include "runtime/workload_management/resource_context.h"
+#include "runtime/workload_management/task_controller.h"
#include "util/time.h"
namespace doris {
@@ -88,6 +89,13 @@ bool
WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtim
action_runtime_ctx->resource_ctx->memory_context()->current_memory_bytes());
break;
}
+ case WorkloadMetricType::USERNAME: {
+ // Reject the condition when BE does not have explicit user
metadata.
+ if
(!action_runtime_ctx->resource_ctx->task_controller()->get_user(&val)) {
+ return false;
+ }
+ break;
+ }
default:
return false;
}
@@ -111,4 +119,4 @@ void
WorkloadSchedPolicy::exec_action(WorkloadAction::RuntimeContext* action_run
}
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/runtime/workload_management/query_task_controller_test.cpp
b/be/test/runtime/workload_management/query_task_controller_test.cpp
new file mode 100644
index 00000000000..82f4cba8ba0
--- /dev/null
+++ b/be/test/runtime/workload_management/query_task_controller_test.cpp
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/query_task_controller.h"
+
+#include <gtest/gtest.h>
+
+#include "testutil/mock/mock_query_context.h"
+
+namespace doris {
+
+class QueryTaskControllerTest : public testing::Test {
+public:
+ QueryTaskControllerTest() = default;
+ ~QueryTaskControllerTest() override = default;
+};
+
+TEST_F(QueryTaskControllerTest, TestGetUser) {
+ // 1. Create MockQueryContext
+ auto query_ctx = MockQueryContext::create();
+
+ // 2. Create QueryTaskController
+ auto task_controller = QueryTaskController::create(query_ctx);
+ std::string user;
+
+ // 3. Test default (set_rsc_info = false)
+ EXPECT_FALSE(task_controller->get_user(&user));
+
+ // 4. Test with user set but set_rsc_info = false
+ query_ctx->user = "test_user";
+ EXPECT_FALSE(task_controller->get_user(&user));
+
+ // 5. Test with set_rsc_info = true
+ query_ctx->set_rsc_info = true;
+ EXPECT_TRUE(task_controller->get_user(&user));
+ EXPECT_EQ("test_user", user);
+
+ // 6. Test when QueryContext is destroyed
+ std::shared_ptr<QueryContext> ctx_ptr = MockQueryContext::create();
+ auto controller_ptr = QueryTaskController::create(ctx_ptr);
+ ctx_ptr->set_rsc_info = true;
+ ctx_ptr->user = "user1";
+ EXPECT_TRUE(controller_ptr->get_user(&user));
+ EXPECT_EQ("user1", user);
+
+ ctx_ptr.reset(); // Destroy the context
+ EXPECT_FALSE(controller_ptr->get_user(&user));
+}
+
+} // namespace doris
diff --git a/be/test/runtime/workload_management/workload_condition_test.cpp
b/be/test/runtime/workload_management/workload_condition_test.cpp
new file mode 100644
index 00000000000..305496d2755
--- /dev/null
+++ b/be/test/runtime/workload_management/workload_condition_test.cpp
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_condition.h"
+
+#include <gtest/gtest.h>
+
+namespace doris {
+
+class WorkloadConditionTest : public testing::Test {
+public:
+ WorkloadConditionTest() = default;
+ ~WorkloadConditionTest() override = default;
+};
+
+TEST_F(WorkloadConditionTest, TestUsernameCondition) {
+ // 1. Equal
+ {
+ WorkloadConditionUsername cond(WorkloadCompareOperator::EQUAL,
"test_user");
+ EXPECT_TRUE(cond.eval("test_user"));
+ EXPECT_FALSE(cond.eval("other_user"));
+ EXPECT_EQ(WorkloadMetricType::USERNAME,
cond.get_workload_metric_type());
+ }
+
+ // 2. Unsupported Operator (GREATER)
+ {
+ WorkloadConditionUsername cond(WorkloadCompareOperator::GREATER,
"a_user");
+ EXPECT_FALSE(cond.eval("b_user")); // Not supported, returns false
+ }
+}
+
+} // namespace doris
diff --git a/be/test/runtime/workload_sched_policy_test.cpp
b/be/test/runtime/workload_sched_policy_test.cpp
index 3f30c08d8c3..719ce0f04ad 100644
--- a/be/test/runtime/workload_sched_policy_test.cpp
+++ b/be/test/runtime/workload_sched_policy_test.cpp
@@ -60,6 +60,45 @@ protected:
void SetUp() override {}
};
+class MockTaskController : public TaskController {
+public:
+ MockTaskController() = default;
+ ~MockTaskController() override = default;
+
+ // Simulate BE task controllers that may or may not carry user metadata.
+ bool get_user(std::string* user) override {
+ if (!_has_user) {
+ return false;
+ }
+ *user = _user;
+ return true;
+ }
+ void set_user(std::string user) {
+ _user = std::move(user);
+ _has_user = true;
+ }
+ void clear_user() {
+ _user.clear();
+ _has_user = false;
+ }
+
+private:
+ bool _has_user = false;
+ std::string _user;
+};
+
+class MockWorkloadAction : public WorkloadAction {
+public:
+ MockWorkloadAction(WorkloadActionType type) : _type(type) {}
+ void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) override {
_exec_count++; }
+ WorkloadActionType get_action_type() override { return _type; }
+ int get_exec_count() { return _exec_count; }
+
+private:
+ WorkloadActionType _type;
+ int _exec_count = 0;
+};
+
TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) {
// 1 empty resource
{
@@ -324,4 +363,122 @@ TEST_F(WorkloadSchedPolicyTest,
test_task_controller_running_time) {
EXPECT_EQ(task_controller->running_time(), finished_running_time);
}
+TEST_F(WorkloadSchedPolicyTest, one_policy_username_condition) {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::USERNAME,
+
TCompareOperator::type::EQUAL, "admin"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ std::unique_ptr<MockTaskController> task_controller =
std::make_unique<MockTaskController>();
+ task_controller->set_user("root");
+
action_runtime_ctx.resource_ctx->set_task_controller(std::move(task_controller));
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+
+
static_cast<MockTaskController*>(action_runtime_ctx.resource_ctx->task_controller())
+ ->set_user("admin");
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+
+ // Missing user metadata must not be treated as an empty username match.
+
static_cast<MockTaskController*>(action_runtime_ctx.resource_ctx->task_controller())
+ ->clear_user();
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+
+ // Test INVALID operator
+ {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+ cond_ptr_list.push_back(create_workload_condition(
+ TWorkloadMetricType::type::USERNAME,
TCompareOperator::type::GREATER, "admin"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ std::unique_ptr<MockTaskController> task_controller =
+ std::make_unique<MockTaskController>();
+ task_controller->set_user("admin");
+
action_runtime_ctx.resource_ctx->set_task_controller(std::move(task_controller));
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+ }
+}
+
+TEST_F(WorkloadSchedPolicyTest, policy_mixed_conditions) {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::USERNAME,
+
TCompareOperator::type::EQUAL, "admin"));
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
+
TCompareOperator::type::GREATER, "10"));
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ std::unique_ptr<MockTaskController> task_controller =
std::make_unique<MockTaskController>();
+ task_controller->set_user("admin");
+ TUniqueId task_id;
+ task_id.hi = 1;
+ task_id.lo = 1;
+ task_controller->set_task_id(task_id);
+
action_runtime_ctx.resource_ctx->set_task_controller(std::move(task_controller));
+
+ // 1. Username matches, but query time too short
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+
+ // 2. Username matches, query time long enough
+ std::this_thread::sleep_for(std::chrono::milliseconds(20));
+ action_runtime_ctx.resource_ctx->task_controller()->finish();
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+
+ // 3. Username mismatch, query time long enough
+
static_cast<MockTaskController*>(action_runtime_ctx.resource_ctx->task_controller())
+ ->set_user("root");
+ EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
+}
+
+TEST_F(WorkloadSchedPolicyTest, policy_action_execution) {
+ std::shared_ptr<WorkloadSchedPolicy> policy =
std::make_shared<WorkloadSchedPolicy>();
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::USERNAME,
+
TCompareOperator::type::EQUAL, "admin"));
+
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+ auto mock_action =
std::make_unique<MockWorkloadAction>(WorkloadActionType::CANCEL_QUERY);
+ MockWorkloadAction* mock_action_ptr = mock_action.get();
+ action_ptr_list.push_back(std::move(mock_action));
+
+ std::set<int64_t> wg_id_set;
+ policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
+ std::move(action_ptr_list));
+
+ WorkloadAction::RuntimeContext action_runtime_ctx =
create_runtime_context();
+ std::unique_ptr<MockTaskController> task_controller =
std::make_unique<MockTaskController>();
+ task_controller->set_user("admin");
+
action_runtime_ctx.resource_ctx->set_task_controller(std::move(task_controller));
+
+ EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
+ policy->exec_action(&action_runtime_ctx);
+ EXPECT_EQ(mock_action_ptr->get_exec_count(), 1);
+}
+
+TEST_F(WorkloadSchedPolicyTest, invalid_condition_creation) {
+ // Test creating condition with invalid metric type (using a value outside
enum range if possible, or mocked TWorkloadCondition)
+ TWorkloadCondition cond;
+ cond.metric_name = static_cast<TWorkloadMetricType::type>(999); // Invalid
type
+ cond.op = TCompareOperator::type::EQUAL;
+ cond.value = "test";
+
+ auto result = WorkloadConditionFactory::create_workload_condition(&cond);
+ EXPECT_EQ(result, nullptr);
+}
+
} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index a1c912eede8..ad8fa4475e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -52,6 +52,7 @@ import org.apache.doris.thrift.TPipelineInstanceParams;
import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
+import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TUniqueId;
@@ -277,6 +278,13 @@ public class NereidsStreamLoadPlanner {
params.setCoord(new
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
params.setCurrentConnectFe(new
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+ if (ConnectContext.get() != null &&
ConnectContext.get().getCurrentUserIdentity() != null) {
+ TResourceInfo resourceInfo = new TResourceInfo();
+
resourceInfo.setUser(ConnectContext.get().getCurrentUserIdentity().getQualifiedUser());
+ resourceInfo.setGroup("");
+ params.setResourceInfo(resourceInfo);
+ }
+
TPipelineInstanceParams execParams = new TPipelineInstanceParams();
// user load id (streamLoadTask.id) as query id
params.setQueryId(loadId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 16bbc6a3397..f55581076a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -115,6 +115,7 @@ import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
+import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TRuntimeFilterParams;
import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
@@ -3291,6 +3292,13 @@ public class Coordinator implements CoordInterface {
params.setWorkloadGroups(tWorkloadGroups);
}
+ if (context != null && context.getCurrentUserIdentity() !=
null) {
+ TResourceInfo resourceInfo = new TResourceInfo();
+
resourceInfo.setUser(context.getCurrentUserIdentity().getQualifiedUser());
+ resourceInfo.setGroup("");
+ params.setResourceInfo(resourceInfo);
+ }
+
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setTotalInstances(instanceExecParams.size());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 107d615fb89..2ef71ab66dc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -65,6 +65,7 @@ import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TRecCTENode;
import org.apache.doris.thrift.TRecCTEResetInfo;
import org.apache.doris.thrift.TRecCTETarget;
+import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TRuntimeFilterInfo;
import org.apache.doris.thrift.TRuntimeFilterParams;
import org.apache.doris.thrift.TScanRangeParams;
@@ -404,6 +405,13 @@ public class ThriftPlansBuilder {
params.setLocalParams(Lists.newArrayList());
params.setWorkloadGroups(coordinatorContext.getWorkloadGroups());
+ if (connectContext != null &&
connectContext.getCurrentUserIdentity() != null) {
+ TResourceInfo resourceInfo = new TResourceInfo();
+
resourceInfo.setUser(connectContext.getCurrentUserIdentity().getQualifiedUser());
+ resourceInfo.setGroup("");
+ params.setResourceInfo(resourceInfo);
+ }
+
params.setFileScanParams(fileScanRangeParamsMap);
if (fragmentPlan.getFragmentJob() instanceof
UnassignedScanBucketOlapTableJob) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java
index 90431a0141b..013675e4510 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java
@@ -17,6 +17,8 @@
package org.apache.doris.resource.workloadschedpolicy;
+import org.apache.doris.common.UserException;
+
import com.google.gson.annotations.SerializedName;
public class WorkloadConditionUsername implements WorkloadCondition {
@@ -41,7 +43,15 @@ public class WorkloadConditionUsername implements
WorkloadCondition {
return WorkloadMetricType.USERNAME;
}
- public static WorkloadConditionUsername
createWorkloadCondition(WorkloadConditionOperator op, String value) {
+ public static WorkloadConditionUsername
createWorkloadCondition(WorkloadConditionOperator op, String value)
+ throws UserException {
+ if (op != WorkloadConditionOperator.EQUAL) {
+ throw new UserException("username only support EQUAL operator");
+ }
+ // Reject blank usernames so BE-side missing user metadata cannot be
matched by mistake.
+ if (value == null || value.trim().isEmpty()) {
+ throw new UserException("username can not be empty");
+ }
// todo(wb) check whether input username is valid
return new WorkloadConditionUsername(op, value);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 2d3cd486f58..3442c8a7789 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -106,7 +106,7 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon
implements Writable, Gs
public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
= new
ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
- .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
+
.add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).add(WorkloadMetricType.USERNAME).build();
// used for convert fe type to thrift type
public static final ImmutableMap<WorkloadMetricType, TWorkloadMetricType>
METRIC_MAP
@@ -114,7 +114,8 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon
implements Writable, Gs
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
.put(WorkloadMetricType.BE_SCAN_ROWS,
TWorkloadMetricType.BE_SCAN_ROWS)
.put(WorkloadMetricType.BE_SCAN_BYTES,
TWorkloadMetricType.BE_SCAN_BYTES)
- .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES,
TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
+ .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES,
TWorkloadMetricType.QUERY_BE_MEMORY_BYTES)
+ .put(WorkloadMetricType.USERNAME,
TWorkloadMetricType.USERNAME).build();
public static final ImmutableMap<WorkloadActionType, TWorkloadActionType>
ACTION_MAP
= new ImmutableMap.Builder<WorkloadActionType,
TWorkloadActionType>()
.put(WorkloadActionType.MOVE_QUERY_TO_GROUP,
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
@@ -196,7 +197,7 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon
implements Writable, Gs
WorkloadCondition cond =
WorkloadCondition.createWorkloadCondition(cm);
policyConditionList.add(cond);
}
- boolean feCondition = checkPolicyCondition(policyConditionList);
+ Boolean feCondition = checkPolicyCondition(policyConditionList);
// 2 create action
List<WorkloadAction> policyActionList = new ArrayList<>();
@@ -207,7 +208,7 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon
implements Writable, Gs
}
boolean feAction = checkPolicyAction(policyActionList);
- if (feAction != feCondition) {
+ if (feCondition != null && feAction != feCondition) {
throw new UserException("action and metric must run in FE together
or run in BE together");
}
@@ -245,27 +246,36 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon
implements Writable, Gs
}
}
- private boolean checkPolicyCondition(List<WorkloadCondition>
conditionList) throws UserException {
+ private Boolean checkPolicyCondition(List<WorkloadCondition>
conditionList) throws UserException {
if (conditionList.size() >
Config.workload_max_condition_num_in_policy) {
throw new UserException(
"condition num in a policy can not exceed " +
Config.workload_max_condition_num_in_policy);
}
- boolean containsFeMetric = false;
- boolean containsBeMetric = false;
+ boolean hasFeOnlyMetric = false;
+ boolean hasBeOnlyMetric = false;
for (WorkloadCondition cond : conditionList) {
- if (FE_METRIC_SET.contains(cond.getMetricType())) {
- containsFeMetric = true;
- }
- if (BE_METRIC_SET.contains(cond.getMetricType())) {
- containsBeMetric = true;
+ boolean isFe = FE_METRIC_SET.contains(cond.getMetricType());
+ boolean isBe = BE_METRIC_SET.contains(cond.getMetricType());
+
+ if (isFe && !isBe) {
+ hasFeOnlyMetric = true;
+ } else if (isBe && !isFe) {
+ hasBeOnlyMetric = true;
}
- if (containsFeMetric && containsBeMetric) {
+
+ if (hasFeOnlyMetric && hasBeOnlyMetric) {
throw new UserException(
- "one policy can not contains fe and be metric, FE
metric list is " + FE_METRIC_SET
+ "one policy can not contains fe only and be only
metric, FE metric list is " + FE_METRIC_SET
+ ", BE metric list is " + BE_METRIC_SET);
}
}
- return containsFeMetric;
+ if (hasFeOnlyMetric) {
+ return true;
+ } else if (hasBeOnlyMetric) {
+ return false;
+ } else {
+ return null;
+ }
}
private boolean checkPolicyAction(List<WorkloadAction> actionList) throws
UserException {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java
index 615fc0bc2eb..7461cba03d5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java
@@ -17,13 +17,17 @@
package org.apache.doris.resource.workloadschedpolicy;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
+import org.apache.doris.persist.EditLog;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashMap;
@@ -31,23 +35,13 @@ import java.util.List;
import java.util.Map;
/**
- * Unit tests for the workload_group property format enforced by
- * {@link WorkloadSchedPolicyMgr#checkProperties(Map, List)}.
- *
- * The contract:
- *
- * - Cloud mode : workload_group must be
'<compute_group>.<workload_group>'.
- * - Non-cloud mode: workload_group may be '<workload_group>' (defaulting the
- * resource group to Tag.VALUE_DEFAULT_TAG) or the
- * '<resource_group>.<workload_group>' form — the dotted
- * prefix is a resource group (Tag) here, sharing the
cloud-mode
- * grammar purely for consistency.
- *
- * Invalid forms must be rejected BEFORE any compute-group lookup, so the
- * rejection is exercisable here without bootstrapping the full Env.
+ * Unit tests for workload schedule policy validation paths.
*/
public class WorkloadSchedPolicyMgrTest {
+ private Env env;
+ private EditLog editLog;
+ private MockedStatic<Env> mockedEnv;
private String originDeployMode;
private String originCloudUniqueId;
private WorkloadSchedPolicyMgr mgr;
@@ -57,12 +51,21 @@ public class WorkloadSchedPolicyMgrTest {
originDeployMode = Config.deploy_mode;
originCloudUniqueId = Config.cloud_unique_id;
mgr = new WorkloadSchedPolicyMgr();
+ env = Mockito.mock(Env.class);
+ editLog = Mockito.mock(EditLog.class);
+ mockedEnv = Mockito.mockStatic(Env.class);
+
+ mockedEnv.when(Env::getCurrentEnv).thenReturn(env);
+ Mockito.when(env.getEditLog()).thenReturn(editLog);
}
@After
public void tearDown() {
Config.deploy_mode = originDeployMode;
Config.cloud_unique_id = originCloudUniqueId;
+ if (mockedEnv != null) {
+ mockedEnv.close();
+ }
}
private Map<String, String> propsWith(String workloadGroupValue) {
@@ -71,6 +74,143 @@ public class WorkloadSchedPolicyMgrTest {
return p;
}
+ @Test
+ public void testCheckPolicyCondition() {
+ // Case 1: USERNAME (Shared) + BE Metric + BE Action -> OK
+ try {
+ List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+ conditionMetas.add(new WorkloadConditionMeta("username", "=",
"user1"));
+ conditionMetas.add(new WorkloadConditionMeta("be_scan_rows", ">",
"1000"));
+
+ List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+ actionMetas.add(new WorkloadActionMeta("cancel_query", ""));
+
+ mgr.createWorkloadSchedPolicy("policy_mixed_be", false,
conditionMetas, actionMetas, null);
+ } catch (UserException e) {
+ Assert.fail("Should not throw exception for mixed USERNAME and BE
metrics: " + e.getMessage());
+ }
+
+ // Case 2: USERNAME (Shared) + FE Action -> OK
+ try {
+ List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+ conditionMetas.add(new WorkloadConditionMeta("username", "=",
"user1"));
+
+ List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+ actionMetas.add(new WorkloadActionMeta("set_session_variable",
"workload_group=normal"));
+
+ mgr.createWorkloadSchedPolicy("policy_fe_only", false,
conditionMetas, actionMetas, null);
+ } catch (UserException e) {
+ Assert.fail("Should not throw exception for USERNAME + FE Action:
" + e.getMessage());
+ }
+
+ // Case 3: USERNAME (Shared) + BE Action -> OK
+ try {
+ List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+ conditionMetas.add(new WorkloadConditionMeta("username", "=",
"user1"));
+
+ List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+ actionMetas.add(new WorkloadActionMeta("cancel_query", ""));
+
+ mgr.createWorkloadSchedPolicy("policy_username_be_action", false,
conditionMetas, actionMetas, null);
+ } catch (UserException e) {
+ Assert.fail("Should not throw exception for USERNAME + BE Action:
" + e.getMessage());
+ }
+
+ // Case 4: BE Metric + FE Action -> Error
+ try {
+ List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+ conditionMetas.add(new WorkloadConditionMeta("query_time", ">",
"1000"));
+
+ List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+ actionMetas.add(new WorkloadActionMeta("set_session_variable",
"workload_group=normal"));
+
+ mgr.createWorkloadSchedPolicy("policy_error_1", false,
conditionMetas, actionMetas, null);
+ Assert.fail("Should throw exception for BE Metric + FE Action");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("action and metric must
run in FE together or run in BE together"));
+ }
+
+ // Case 5: USERNAME + BE Metric + FE Action -> Error
+ try {
+ List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+ conditionMetas.add(new WorkloadConditionMeta("username", "=",
"user1"));
+ conditionMetas.add(new WorkloadConditionMeta("query_time", ">",
"1000"));
+
+ List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+ actionMetas.add(new WorkloadActionMeta("set_session_variable",
"workload_group=normal"));
+
+ mgr.createWorkloadSchedPolicy("policy_error_2", false,
conditionMetas, actionMetas, null);
+ Assert.fail("Should throw exception for USERNAME + BE Metric + FE
Action");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("action and metric must
run in FE together or run in BE together"));
+ }
+ }
+
+ @Test
+ public void testCheckProperties() throws UserException {
+ List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+ conditionMetas.add(new WorkloadConditionMeta("username", "=",
"user1"));
+ List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+ actionMetas.add(new WorkloadActionMeta("cancel_query", ""));
+
+ // Test valid priority.
+ try {
+ Map<String, String> props = new HashMap<>();
+ props.put("priority", "10");
+ props.put("enabled", "true");
+ mgr.createWorkloadSchedPolicy("policy_prop_valid", false,
conditionMetas, actionMetas, props);
+ } catch (UserException e) {
+ Assert.fail("Should not throw exception for valid properties: " +
e.getMessage());
+ }
+
+ // Test invalid priority.
+ try {
+ Map<String, String> props = new HashMap<>();
+ props.put("priority", "101");
+ mgr.createWorkloadSchedPolicy("policy_prop_invalid_prio", false,
conditionMetas, actionMetas, props);
+ Assert.fail("Should throw exception for invalid priority");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("policy's priority can
only between 0 ~ 100"));
+ }
+
+ // Test invalid enabled.
+ try {
+ Map<String, String> props = new HashMap<>();
+ props.put("enabled", "yes");
+ mgr.createWorkloadSchedPolicy("policy_prop_invalid_enabled",
false, conditionMetas, actionMetas, props);
+ Assert.fail("Should throw exception for invalid enabled");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("invalid enabled
property value"));
+ }
+ }
+
+ @Test
+ public void testUsernameConditionRejectsBlankValue() throws UserException {
+ List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
+ List<WorkloadActionMeta> actionMetas = new ArrayList<>();
+ actionMetas.add(new WorkloadActionMeta("cancel_query", ""));
+
+ // Reject an explicit empty username to avoid matching queries without
user metadata.
+ try {
+ conditionMetas.add(new WorkloadConditionMeta("username", "=", ""));
+ mgr.createWorkloadSchedPolicy("policy_empty_username", false,
conditionMetas, actionMetas, null);
+ Assert.fail("Should throw exception for empty username");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("username can not be
empty"));
+ }
+
+ conditionMetas.clear();
+
+ // Reject a blank username for the same reason.
+ try {
+ conditionMetas.add(new WorkloadConditionMeta("username", "=", "
"));
+ mgr.createWorkloadSchedPolicy("policy_blank_username", false,
conditionMetas, actionMetas, null);
+ Assert.fail("Should throw exception for blank username");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("username can not be
empty"));
+ }
+ }
+
@Test
public void testCloudModeRejectsUnqualifiedWorkloadGroup() {
Config.cloud_unique_id = "ut_cloud";
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 44c1e7cd94c..cab711b7831 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -295,7 +295,8 @@ enum TWorkloadMetricType {
QUERY_TIME = 0,
BE_SCAN_ROWS = 1,
BE_SCAN_BYTES = 2,
- QUERY_BE_MEMORY_BYTES = 3
+ QUERY_BE_MEMORY_BYTES = 3,
+ USERNAME = 4
}
enum TCompareOperator {
diff --git
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
index 3ba468b79bc..b205ce9dda0 100644
---
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
+++
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -247,4 +247,44 @@ suite("test_workload_sched_policy") {
sql "drop user test_alter_policy_user"
sql "drop workload policy test_alter_policy"
-}
\ No newline at end of file
+
+ //
============================================================================
+ // Test policy creation with username (shared metric) and query_time (BE
metric)
+ //
============================================================================
+
+ // 1. Create a user
+ sql "DROP USER IF EXISTS 'test_policy_user_be'"
+ sql "CREATE USER 'test_policy_user_be'@'%' IDENTIFIED BY '12345'"
+ sql "GRANT SELECT_PRIV ON *.* TO 'test_policy_user_be'@'%'"
+
+ // 2. Create a workload group
+ sql "DROP WORKLOAD GROUP IF EXISTS policy_group_be $forComputeGroupStr"
+ sql "CREATE WORKLOAD GROUP policy_group_be $forComputeGroupStr PROPERTIES
('max_cpu_percent'='100')"
+ sql "GRANT USAGE_PRIV ON WORKLOAD GROUP 'policy_group_be' TO
'test_policy_user_be'@'%'"
+
+ // 3. Create a policy with both username (shared metric) and query_time
(BE metric)
+ sql "DROP WORKLOAD POLICY IF EXISTS test_mixed_policy"
+
+ sql """
+ CREATE WORKLOAD POLICY test_mixed_policy
+ CONDITIONS(username='test_policy_user_be', query_time > 1000)
+ ACTIONS(cancel_query)
+ PROPERTIES('workload_group'='${currentCgName}policy_group_be')
+ """
+
+ // 4. Verify policy creation
+ def policy = sql "SELECT * FROM information_schema.workload_policy WHERE
name='test_mixed_policy'"
+ assertTrue(policy.size() > 0, "Policy should be created successfully")
+
+ // 5. Smoke check the test user can run in the workload group. This is not
intended to trigger
+ // cancel_query; the policy creation above is the behavior covered by
this case.
+ connect('test_policy_user_be', '12345', context.config.jdbcUrl) {
+ sql "set workload_group = 'policy_group_be'"
+ sql "SELECT 1"
+ }
+
+ // Cleanup
+ sql "DROP WORKLOAD POLICY IF EXISTS test_mixed_policy"
+ sql "DROP WORKLOAD GROUP IF EXISTS policy_group_be $forComputeGroupStr"
+ sql "DROP USER IF EXISTS 'test_policy_user_be'"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]