This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 326a264fcd41c0db7bf0ccfbd9be6d172c469448
Author: wangbo <[email protected]>
AuthorDate: Fri Mar 22 15:20:58 2024 +0800

    [Improvement](executor)Add spill property for workload group #32554
---
 .../schema_workload_groups_scanner.cpp             |   7 +-
 be/src/runtime/workload_group/workload_group.cpp   |  28 +++++-
 be/src/runtime/workload_group/workload_group.h     |   4 +
 .../java/org/apache/doris/catalog/SchemaTable.java |   4 +-
 .../resource/workloadgroup/WorkloadGroup.java      | 109 +++++++++++++++++++--
 .../resource/workloadgroup/WorkloadGroupMgr.java   |   1 +
 .../doris/tablefunction/MetadataGenerator.java     |  49 +++------
 gensrc/thrift/BackendService.thrift                |   2 +
 .../data/workload_manager_p0/test_curd_wlg.out     |  12 +++
 .../workload_manager_p0/test_curd_wlg.groovy       |  76 ++++++++++++++
 10 files changed, 243 insertions(+), 49 deletions(-)

diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp 
b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
index b3fb9adcbeb..24e23a3e336 100644
--- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
@@ -35,10 +35,13 @@ std::vector<SchemaScanner::ColumnDesc> 
SchemaWorkloadGroupsScanner::_s_tbls_colu
         {"MAX_CONCURRENCY", TYPE_BIGINT, sizeof(int64_t), true},
         {"MAX_QUEUE_SIZE", TYPE_BIGINT, sizeof(int64_t), true},
         {"QUEUE_TIMEOUT", TYPE_BIGINT, sizeof(int64_t), true},
-        {"CPU_HARD_LIMIT", TYPE_STRING, sizeof(StringRef), true},
+        {"CPU_HARD_LIMIT", TYPE_VARCHAR, sizeof(StringRef), true},
         {"SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
         {"MAX_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
-        {"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true}};
+        {"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
+        {"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), 
true},
+        {"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), 
true},
+};
 
 SchemaWorkloadGroupsScanner::SchemaWorkloadGroupsScanner()
         : SchemaScanner(_s_tbls_columns, 
TSchemaTableType::SCH_WORKLOAD_GROUPS) {}
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 88e5a1221eb..44fe422e0c3 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -44,6 +44,8 @@ const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
 const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
 const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
 const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
+const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
+const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
 
 WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
         : _id(tg_info.id),
@@ -56,17 +58,21 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& 
tg_info)
           _cpu_hard_limit(tg_info.cpu_hard_limit),
           _scan_thread_num(tg_info.scan_thread_num),
           _max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num),
-          _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num) {}
+          _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num),
+          _spill_low_watermark(tg_info.spill_low_watermark),
+          _spill_high_watermark(tg_info.spill_high_watermark) {}
 
 std::string WorkloadGroup::debug_string() const {
     std::shared_lock<std::shared_mutex> rl {_mutex};
     return fmt::format(
             "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, 
enable_memory_overcommit = "
             "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
-            "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = 
{}]",
+            "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = 
{}, "
+            "spill_low_watermark={}, spill_high_watermark={}]",
             _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, 
TUnit::BYTES),
             _enable_memory_overcommit ? "true" : "false", _version, 
cpu_hard_limit(),
-            _scan_thread_num, _max_remote_scan_thread_num, 
_min_remote_scan_thread_num);
+            _scan_thread_num, _max_remote_scan_thread_num, 
_min_remote_scan_thread_num,
+            _spill_low_watermark, _spill_high_watermark);
 }
 
 void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
@@ -91,6 +97,8 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& 
tg_info) {
             _scan_thread_num = tg_info.scan_thread_num;
             _max_remote_scan_thread_num = tg_info.max_remote_scan_thread_num;
             _min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num;
+            _spill_low_watermark = tg_info.spill_low_watermark;
+            _spill_high_watermark = tg_info.spill_high_watermark;
         } else {
             return;
         }
@@ -288,6 +296,20 @@ Status WorkloadGroupInfo::parse_topic_info(const 
TWorkloadGroupInfo& tworkload_g
                 tworkload_group_info.min_remote_scan_thread_num;
     }
 
+    // 12 spill low watermark
+    int spill_low_watermark = SPILL_LOW_WATERMARK_DEFAULT_VALUE;
+    if (tworkload_group_info.__isset.spill_threshold_low_watermark) {
+        spill_low_watermark = 
tworkload_group_info.spill_threshold_low_watermark;
+    }
+    workload_group_info->spill_low_watermark = spill_low_watermark;
+
+    // 13 spil high watermark
+    int spill_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
+    if (tworkload_group_info.__isset.spill_threshold_high_watermark) {
+        spill_high_watermark = 
tworkload_group_info.spill_threshold_high_watermark;
+    }
+    workload_group_info->spill_high_watermark = spill_high_watermark;
+
     return Status::OK();
 }
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index af77be493b6..f70ee4c5582 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -149,6 +149,8 @@ private:
     std::atomic<int> _scan_thread_num;
     std::atomic<int> _max_remote_scan_thread_num;
     std::atomic<int> _min_remote_scan_thread_num;
+    std::atomic<int> _spill_low_watermark;
+    std::atomic<int> _spill_high_watermark;
 
     // means workload group is mark dropped
     // new query can not submit
@@ -178,6 +180,8 @@ struct WorkloadGroupInfo {
     int scan_thread_num;
     int max_remote_scan_thread_num;
     int min_remote_scan_thread_num;
+    int spill_low_watermark;
+    int spill_high_watermark;
     // log cgroup cpu info
     uint64_t cgroup_cpu_shares = 0;
     int cgroup_cpu_hard_limit = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 5654805619f..610a84b4740 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -480,10 +480,12 @@ public class SchemaTable extends Table {
                             .column("MAX_CONCURRENCY", 
ScalarType.createType(PrimitiveType.BIGINT))
                             .column("MAX_QUEUE_SIZE", 
ScalarType.createType(PrimitiveType.BIGINT))
                             .column("QUEUE_TIMEOUT", 
ScalarType.createType(PrimitiveType.BIGINT))
-                            .column("CPU_HARD_LIMIT", 
ScalarType.createStringType())
+                            .column("CPU_HARD_LIMIT", 
ScalarType.createVarchar(256))
                             .column("SCAN_THREAD_NUM", 
ScalarType.createType(PrimitiveType.BIGINT))
                             .column("MAX_REMOTE_SCAN_THREAD_NUM", 
ScalarType.createType(PrimitiveType.BIGINT))
                             .column("MIN_REMOTE_SCAN_THREAD_NUM", 
ScalarType.createType(PrimitiveType.BIGINT))
+                            .column("SPILL_THRESHOLD_LOW_WATERMARK", 
ScalarType.createVarchar(256))
+                            .column("SPILL_THRESHOLD_HIGH_WATERMARK", 
ScalarType.createVarchar(256))
                             .build()))
             .build();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index b14b3afec03..4a220252afe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -67,13 +67,21 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public static final String MIN_REMOTE_SCAN_THREAD_NUM = 
"min_remote_scan_thread_num";
 
+    public static final String SPILL_THRESHOLD_LOW_WATERMARK = 
"spill_threshold_low_watermark";
+
+    public static final String SPILL_THRESHOLD_HIGH_WATERMARK = 
"spill_threshold_high_watermark";
+
     // NOTE(wb): all property is not required, some properties default value 
is set in be
     // default value is as followed
     // cpu_share=1024, memory_limit=0%(0 means not limit), 
enable_memory_overcommit=true
     private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new 
ImmutableSet.Builder<String>()
             
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
             
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
-            
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM).build();
+            .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
+            
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK).build();
+
+    public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
+    public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
 
     @SerializedName(value = "id")
     private long id;
@@ -120,6 +128,20 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             this.cpuHardLimit = Integer.parseInt(cpuHardLimitStr);
             this.properties.put(CPU_HARD_LIMIT, cpuHardLimitStr);
         }
+        if (properties.containsKey(SPILL_THRESHOLD_LOW_WATERMARK)) {
+            String lowWatermarkStr = 
properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
+            if (lowWatermarkStr.endsWith("%")) {
+                lowWatermarkStr = lowWatermarkStr.substring(0, 
lowWatermarkStr.length() - 1);
+            }
+            this.properties.put(SPILL_THRESHOLD_LOW_WATERMARK, 
lowWatermarkStr);
+        }
+        if (properties.containsKey(SPILL_THRESHOLD_HIGH_WATERMARK)) {
+            String highWatermarkStr = 
properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
+            if (highWatermarkStr.endsWith("%")) {
+                highWatermarkStr = highWatermarkStr.substring(0, 
highWatermarkStr.length() - 1);
+            }
+            this.properties.put(SPILL_THRESHOLD_HIGH_WATERMARK, 
highWatermarkStr);
+        }
         resetQueueProperty(properties);
     }
 
@@ -252,7 +274,7 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                 }
             } catch (NumberFormatException e) {
                 throw new DdlException(
-                        MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive 
integer or -1. but input value is " + value);
+                        MIN_REMOTE_SCAN_THREAD_NUM + " must be a positive 
integer or -1. but input value is " + value);
             }
         }
 
@@ -284,6 +306,51 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                 throw new DdlException(QUEUE_TIMEOUT + " requires a positive 
integer");
             }
         }
+
+        int lowWaterMark = SPILL_LOW_WATERMARK_DEFAULT_VALUE;
+        if (properties.containsKey(SPILL_THRESHOLD_LOW_WATERMARK)) {
+            String lowVal = properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
+            if (lowVal.endsWith("%")) {
+                lowVal = lowVal.substring(0, lowVal.length() - 1);
+            }
+            try {
+                int intValue = Integer.parseInt(lowVal);
+                if ((intValue < 1 || intValue > 100) && intValue != -1) {
+                    throw new NumberFormatException();
+                }
+                lowWaterMark = intValue;
+            } catch (NumberFormatException e) {
+                throw new DdlException(
+                        SPILL_THRESHOLD_LOW_WATERMARK
+                                + " must be a positive integer(1 ~ 100) or -1. 
but input value is "
+                                + lowVal);
+            }
+        }
+
+        int highWaterMark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
+        if (properties.containsKey(SPILL_THRESHOLD_HIGH_WATERMARK)) {
+            String highVal = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
+            if (highVal.endsWith("%")) {
+                highVal = highVal.substring(0, highVal.length() - 1);
+            }
+            try {
+                int intValue = Integer.parseInt(highVal);
+                if ((intValue < 1 || intValue > 100)) {
+                    throw new NumberFormatException();
+                }
+                highWaterMark = intValue;
+            } catch (NumberFormatException e) {
+                throw new DdlException(
+                        SPILL_THRESHOLD_HIGH_WATERMARK + " must be a positive 
integer(1 ~ 100). but input value is "
+                                + highVal);
+            }
+        }
+
+        if (lowWaterMark > highWaterMark) {
+            throw new DdlException(SPILL_THRESHOLD_HIGH_WATERMARK + "(" + 
highWaterMark + ") should bigger than "
+                    + SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + 
")");
+        }
+
     }
 
     public long getId() {
@@ -323,9 +390,9 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         row.add(String.valueOf(id));
         row.add(name);
         // skip id,name,running query,waiting query
-        for (int i = 2; i < 
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size() - 2; i++) {
+        for (int i = 2; i < 
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
             String key = 
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
-            if (CPU_HARD_LIMIT.equalsIgnoreCase(key)) {
+            if (CPU_HARD_LIMIT.equals(key)) {
                 String val = properties.get(key);
                 if (StringUtils.isEmpty(val)) { // cpu_hard_limit is not 
required
                     row.add("0%");
@@ -342,14 +409,32 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                 row.add("-1");
             } else if (MAX_REMOTE_SCAN_THREAD_NUM.equals(key) && 
!properties.containsKey(key)) {
                 row.add("-1");
-            }  else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && 
!properties.containsKey(key)) {
+            } else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && 
!properties.containsKey(key)) {
                 row.add("-1");
+            } else if (SPILL_THRESHOLD_LOW_WATERMARK.equals(key)) {
+                String val = properties.get(key);
+                if (StringUtils.isEmpty(val)) {
+                    row.add(SPILL_LOW_WATERMARK_DEFAULT_VALUE + "%");
+                } else if ("-1".equals(val)) {
+                    row.add("-1");
+                } else {
+                    row.add(val + "%");
+                }
+            } else if (SPILL_THRESHOLD_HIGH_WATERMARK.equals(key)) {
+                String val = properties.get(key);
+                if (StringUtils.isEmpty(val)) {
+                    row.add(SPILL_HIGH_WATERMARK_DEFAULT_VALUE + "%");
+                } else {
+                    row.add(val + "%");
+                }
+            } else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) {
+                row.add(qq == null ? "0" : 
String.valueOf(qq.getCurrentRunningQueryNum()));
+            } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
+                row.add(qq == null ? "0" : 
String.valueOf(qq.getCurrentWaitingQueryNum()));
             } else {
                 row.add(properties.get(key));
             }
         }
-        row.add(qq == null ? "0" : 
String.valueOf(qq.getCurrentRunningQueryNum()));
-        row.add(qq == null ? "0" : 
String.valueOf(qq.getCurrentWaitingQueryNum()));
         result.addRow(row);
     }
 
@@ -414,6 +499,16 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             
tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr));
         }
 
+        String spillLowWatermarkStr = 
properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
+        if (spillLowWatermarkStr != null) {
+            
tWorkloadGroupInfo.setSpillThresholdLowWatermark(Integer.parseInt(spillLowWatermarkStr));
+        }
+
+        String spillHighWatermarkStr = 
properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
+        if (spillHighWatermarkStr != null) {
+            
tWorkloadGroupInfo.setSpillThresholdHighWatermark(Integer.parseInt(spillHighWatermarkStr));
+        }
+
         TopicInfo topicInfo = new TopicInfo();
         topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
         return topicInfo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 1bd1a357127..967efd26e65 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -72,6 +72,7 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
             .add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
             
.add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
             .add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
+            
.add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK)
             
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
             .build();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index adf3a9b3ed2..c84939ffbe6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -21,8 +21,7 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.SchemaTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
@@ -43,6 +42,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
 import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.FrontendService;
@@ -67,7 +67,6 @@ import org.apache.doris.thrift.TUserIdentity;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
@@ -89,49 +88,23 @@ import java.util.concurrent.TimeUnit;
 public class MetadataGenerator {
     private static final Logger LOG = 
LogManager.getLogger(MetadataGenerator.class);
 
-    private static final ImmutableList<Column> ACTIVE_QUERIES_SCHEMA = 
ImmutableList.of(
-            new Column("QUERY_ID", ScalarType.createStringType()),
-            new Column("QUERY_START_TIME", ScalarType.createStringType()),
-            new Column("QUERY_TIME_MS", PrimitiveType.BIGINT),
-            new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT),
-            new Column("DATABASE", ScalarType.createStringType()),
-            new Column("FRONTEND_INSTANCE", ScalarType.createStringType()),
-            new Column("QUEUE_START_TIME", ScalarType.createStringType()),
-            new Column("QUEUE_END_TIME", ScalarType.createStringType()),
-            new Column("QUERY_STATUS", ScalarType.createStringType()),
-            new Column("SQL", ScalarType.createStringType()));
-
     private static final ImmutableMap<String, Integer> 
ACTIVE_QUERIES_COLUMN_TO_INDEX;
 
-
-    private static final ImmutableList<Column> WORKLOAD_GROUPS_SCHEMA = 
ImmutableList.of(
-            new Column("ID", ScalarType.BIGINT),
-            new Column("NAME", ScalarType.createStringType()),
-            new Column("CPU_SHARE", PrimitiveType.BIGINT),
-            new Column("MEMORY_LIMIT", ScalarType.createStringType()),
-            new Column("ENABLE_MEMORY_OVERCOMMIT", 
ScalarType.createStringType()),
-            new Column("MAX_CONCURRENCY", PrimitiveType.BIGINT),
-            new Column("MAX_QUEUE_SIZE", PrimitiveType.BIGINT),
-            new Column("QUEUE_TIMEOUT", PrimitiveType.BIGINT),
-            new Column("CPU_HARD_LIMIT", PrimitiveType.BIGINT),
-            new Column("SCAN_THREAD_NUM", PrimitiveType.BIGINT),
-            new Column("MAX_REMOTE_SCAN_THREAD_NUM", PrimitiveType.BIGINT),
-            new Column("MIN_REMOTE_SCAN_THREAD_NUM", PrimitiveType.BIGINT));
-
     private static final ImmutableMap<String, Integer> 
WORKLOAD_GROUPS_COLUMN_TO_INDEX;
 
     static {
         ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new 
ImmutableMap.Builder();
-        for (int i = 0; i < ACTIVE_QUERIES_SCHEMA.size(); i++) {
-            
activeQueriesbuilder.put(ACTIVE_QUERIES_SCHEMA.get(i).getName().toLowerCase(), 
i);
+        List<Column> activeQueriesColList = 
SchemaTable.TABLE_MAP.get("active_queries").getFullSchema();
+        for (int i = 0; i < activeQueriesColList.size(); i++) {
+            
activeQueriesbuilder.put(activeQueriesColList.get(i).getName().toLowerCase(), 
i);
         }
         ACTIVE_QUERIES_COLUMN_TO_INDEX = activeQueriesbuilder.build();
 
-        ImmutableMap.Builder<String, Integer> workloadGroupsBuilder = new 
ImmutableMap.Builder();
-        for (int i = 0; i < WORKLOAD_GROUPS_SCHEMA.size(); i++) {
-            
workloadGroupsBuilder.put(WORKLOAD_GROUPS_SCHEMA.get(i).getName().toLowerCase(),
 i);
+        ImmutableMap.Builder<String, Integer> workloadGroupBuilder = new 
ImmutableMap.Builder();
+        for (int i = 0; i < 
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
+            
workloadGroupBuilder.put(WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i).toLowerCase(),
 i);
         }
-        WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupsBuilder.build();
+        WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupBuilder.build();
     }
 
     public static TFetchSchemaTableDataResult 
getMetadataTable(TFetchSchemaTableDataRequest request) throws TException {
@@ -458,6 +431,10 @@ public class MetadataGenerator {
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10))));
             // min remote scan thread num
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11))));
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(12)));
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(13)));
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(14)));
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(15)));
             dataBatch.add(trow);
         }
 
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index b803618af49..f42aa41ab75 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -179,6 +179,8 @@ struct TWorkloadGroupInfo {
   9: optional i32 scan_thread_num
   10: optional i32 max_remote_scan_thread_num
   11: optional i32 min_remote_scan_thread_num
+  12: optional i32 spill_threshold_low_watermark
+  13: optional i32 spill_threshold_high_watermark
 }
 
 enum TWorkloadMetricType {
diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out 
b/regression-test/data/workload_manager_p0/test_curd_wlg.out
index 77b9fa75be8..13b33fd9b83 100644
--- a/regression-test/data/workload_manager_p0/test_curd_wlg.out
+++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out
@@ -44,3 +44,15 @@ test_group   10      11%     false   100     0       0       
20%     -1
 normal 20      50%     true    2147483647      0       0       1%      16
 test_group     10      11%     false   100     0       0       20%     -1
 
+-- !show_spill_1 --
+spill_group_test       1024    0%      true    2147483647      0       0       
0%      -1      10%     10%
+
+-- !show_spill_1 --
+spill_group_test       1024    0%      true    2147483647      0       0       
0%      -1      -1      10%
+
+-- !show_spill_2 --
+spill_group_test       1024    0%      true    2147483647      0       0       
0%      -1      5%      10%
+
+-- !show_spill_3 --
+spill_group_test       1024    0%      true    2147483647      0       0       
0%      -1      5%      40%
+
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 92b4836ea7f..a73801f8743 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -325,5 +325,81 @@ suite("test_crud_wlg") {
     Thread.sleep(10000)
     sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from 
${table_name};"
     sql "set workload_group=normal;"
+
+    // test workload spill property
+    // 1 create group
+    test {
+        sql "create workload group if not exists spill_group_test_failed 
properties (  'spill_threshold_low_watermark'='90%');"
+        exception "should bigger than spill_threshold_low_watermark"
+    }
+    sql "create workload group if not exists spill_group_test properties (  
'spill_threshold_low_watermark'='10%','spill_threshold_high_watermark'='10%');"
+    qt_show_spill_1 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,spill_threshold_low_watermark,spill_threshold_high_watermark
 from information_schema.workload_groups where name in ('spill_group_test');"
+
+    test {
+        sql "create workload group if not exists spill_group_test properties ( 
 'spill_threshold_low_watermark'='20%','spill_threshold_high_watermark'='10%');"
+        exception "should bigger than spill_threshold_low_watermark"
+    }
+
+    // 2 alter low
+    sql "alter workload group spill_group_test properties ( 
'spill_threshold_low_watermark'='-1' );"
+    qt_show_spill_1 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,spill_threshold_low_watermark,spill_threshold_high_watermark
 from information_schema.workload_groups where name in ('spill_group_test');"
+
+    sql "alter workload group spill_group_test properties ( 
'spill_threshold_low_watermark'='5%' );"
+    qt_show_spill_2 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,spill_threshold_low_watermark,spill_threshold_high_watermark
 from information_schema.workload_groups where name in ('spill_group_test');"
+
+    test {
+        sql "alter workload group spill_group_test properties ( 
'spill_threshold_low_watermark'='20%' );"
+        exception "should bigger than spill_threshold_low_watermark"
+    }
+
+    test {
+        sql "alter workload group spill_group_test properties ( 
'spill_threshold_low_watermark'='0%' );"
+        exception "must be a positive integer"
+    }
+
+    test {
+        sql "alter workload group spill_group_test properties ( 
'spill_threshold_low_watermark'='101%' );"
+        exception "must be a positive integer"
+    }
+
+    test {
+        sql "create workload group if not exists spill_group_test2 properties 
(  'spill_threshold_low_watermark'='0%')"
+        exception "must be a positive integer"
+    }
+
+    test {
+        sql "create workload group if not exists spill_group_test2 properties 
(  'spill_threshold_low_watermark'='101%')"
+        exception "must be a positive integer"
+    }
+
+    // 3 alter high
+    sql "alter workload group spill_group_test properties ( 
'spill_threshold_high_watermark'='40%' );"
+    qt_show_spill_3 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,spill_threshold_low_watermark,spill_threshold_high_watermark
 from information_schema.workload_groups where name in ('spill_group_test');"
+    test {
+        sql "alter workload group spill_group_test properties ( 
'spill_threshold_high_watermark'='1%' );"
+        exception "should bigger than spill_threshold_low_watermark"
+    }
+
+    test {
+        sql "alter workload group spill_group_test properties ( 
'spill_threshold_high_watermark'='0%' );"
+        exception "must be a positive integer"
+    }
+
+    test {
+        sql "alter workload group spill_group_test properties ( 
'spill_threshold_high_watermark'='101%' );"
+        exception "must be a positive integer"
+    }
+
+    test {
+        sql "create workload group if not exists spill_group_test2 properties 
(  'spill_threshold_high_watermark'='0%')"
+        exception "must be a positive integer"
+    }
+
+    test {
+        sql "create workload group if not exists spill_group_test2 properties 
(  'spill_threshold_high_watermark'='101%')"
+        exception "must be a positive integer"
+    }
+
     sql "drop workload group test_group;"
+    sql "drop workload group spill_group_test;"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to