This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b275cb0f44b70a3b2480fa9cfc44a0b8e01ba761
Author: zhangdong <[email protected]>
AuthorDate: Sun Feb 4 11:11:58 2024 +0800

    [feature](mtmv) mtmv support workload group (#29595)
    
    MTMV supports controlling the resource usage of refresh tasks by setting 
the name of workload group
    about workload group : 
https://doris.apache.org/zh-CN/docs/dev/admin-manual/workload-group
---
 .../main/java/org/apache/doris/catalog/MTMV.java   | 10 ++++
 .../apache/doris/common/util/PropertyAnalyzer.java |  1 +
 .../java/org/apache/doris/mtmv/MTMVPlanUtil.java   |  5 ++
 .../plans/commands/info/AlterMTMVPropertyInfo.java | 13 +++++
 .../trees/plans/commands/info/CreateMTMVInfo.java  | 13 +++++
 .../data/mtmv_p0/test_workload_group_mtmv.out      |  7 +++
 .../org/apache/doris/regression/suite/Suite.groovy | 21 +++++++
 .../suites/mtmv_p0/test_workload_group_mtmv.groovy | 65 ++++++++++++++++++++++
 8 files changed, 135 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
index 7f24934c2a6..e7b0a79dd53 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
@@ -40,6 +40,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -47,6 +48,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -190,6 +192,14 @@ public class MTMV extends OlapTable {
         }
     }
 
+    public Optional<String> getWorkloadGroup() {
+        if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && 
!StringUtils
+                
.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) {
+            return 
Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP));
+        }
+        return Optional.empty();
+    }
+
     public int getRefreshPartitionNum() {
         if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
             int value = 
Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 8a4051c9af3..fd3b8243766 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -164,6 +164,7 @@ public class PropertyAnalyzer {
     public static final String PROPERTIES_GRACE_PERIOD = "grace_period";
     public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = 
"excluded_trigger_tables";
     public static final String PROPERTIES_REFRESH_PARTITION_NUM = 
"refresh_partition_num";
+    public static final String PROPERTIES_WORKLOAD_GROUP = "workload_group";
     // For unique key data model, the feature Merge-on-Write will leverage a 
primary
     // key index and a delete-bitmap to mark duplicate keys as deleted in load 
stage,
     // which can avoid the merging cost in read stage, and accelerate the 
aggregation
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
index 334e54f5090..e3333f32585 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
@@ -43,6 +43,7 @@ import org.apache.doris.qe.SessionVariable;
 import com.google.common.collect.Sets;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 public class MTMVPlanUtil {
@@ -59,6 +60,10 @@ public class MTMVPlanUtil {
         ctx.changeDefaultCatalog(catalog.getName());
         
ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName());
         ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
+        Optional<String> workloadGroup = mtmv.getWorkloadGroup();
+        if (workloadGroup.isPresent()) {
+            ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
+        }
         ctx.getSessionVariable().enableNereidsDML = true;
         return ctx;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java
index d90e2e8f1be..b152aadc98d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java
@@ -20,9 +20,12 @@ package org.apache.doris.nereids.trees.plans.commands.info;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.qe.ConnectContext;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Map;
 import java.util.Objects;
 
@@ -71,6 +74,16 @@ public class AlterMTMVPropertyInfo extends AlterMTMVInfo {
                 }
             } else if 
(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES.equals(key)) {
                 // nothing
+            } else if (PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP.equals(key)) 
{
+                String workloadGroup = 
properties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
+                if (!StringUtils.isEmpty(workloadGroup) && 
!Env.getCurrentEnv().getAccessManager()
+                        .checkWorkloadGroupPriv(ConnectContext.get(), 
workloadGroup, PrivPredicate.USAGE)) {
+                    String message = String
+                            .format("Access denied; you need (at least one of) 
"
+                                            + "the %s privilege(s) to use 
workload group '%s'.",
+                                    "USAGE/ADMIN", workloadGroup);
+                    throw new AnalysisException(message);
+                }
             } else {
                 throw new 
org.apache.doris.nereids.exceptions.AnalysisException("illegal key:" + key);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
index 3070bb19636..db9abb8f02c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
@@ -202,6 +202,19 @@ public class CreateMTMVInfo {
             
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, 
excludedTriggerTables);
             
properties.remove(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
         }
+        if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP)) {
+            String workloadGroup = 
properties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
+            if (!Env.getCurrentEnv().getAccessManager()
+                    .checkWorkloadGroupPriv(ConnectContext.get(), 
workloadGroup, PrivPredicate.USAGE)) {
+                String message = String
+                        .format("Access denied;"
+                                        + " you need (at least one of) the %s 
privilege(s) to use workload group '%s'.",
+                                "USAGE/ADMIN", workloadGroup);
+                throw new AnalysisException(message);
+            }
+            mvProperties.put(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP, 
workloadGroup);
+            properties.remove(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
+        }
     }
 
     /**
diff --git a/regression-test/data/mtmv_p0/test_workload_group_mtmv.out 
b/regression-test/data/mtmv_p0/test_workload_group_mtmv.out
new file mode 100644
index 00000000000..0e9c45c20d4
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_workload_group_mtmv.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !create --
+{workload_group=g1}
+
+-- !alter --
+{workload_group=mv_test_not_exist_group}
+
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index ef4a459485e..97dcb879b04 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -926,6 +926,27 @@ class Suite implements GroovyInterceptable {
         Assert.assertEquals("SUCCESS", status)
     }
 
+    void waitingMTMVTaskFinishedNotNeedSuccess(String jobName) {
+        Thread.sleep(2000);
+        String showTasks = "select TaskId,JobId,JobName,MvId,Status from 
tasks('type'='mv') where JobName = '${jobName}' order by CreateTime ASC"
+        String status = "NULL"
+        List<List<Object>> result
+        long startTime = System.currentTimeMillis()
+        long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
+        do {
+            result = sql(showTasks)
+            logger.info("result: " + result.toString())
+            if (!result.isEmpty()) {
+                status = result.last().get(4)
+            }
+            logger.info("The state of ${showTasks} is ${status}")
+            Thread.sleep(1000);
+        } while (timeoutTimestamp > System.currentTimeMillis() && (status == 
'PENDING' || status == 'RUNNING' || status == 'NULL'))
+        if (status != "SUCCESS") {
+            logger.info("status is not success")
+        }
+    }
+
     String getJobName(String dbName, String mtmvName) {
         String showMTMV = "select JobName from 
mv_infos('database'='${dbName}') where Name = '${mtmvName}'";
            logger.info(showMTMV)
diff --git a/regression-test/suites/mtmv_p0/test_workload_group_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_workload_group_mtmv.groovy
new file mode 100644
index 00000000000..d08ff938f28
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_workload_group_mtmv.groovy
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert;
+
+suite("test_workload_group_mtmv") {
+    def tableName = "t_test_workload_group_mtmv_user"
+    def mvName = "multi_mv_test_workload_group_mtmv"
+    def dbName = "regression_test_mtmv_p0"
+    sql """drop table if exists `${tableName}`"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS `${tableName}` (
+            event_day DATE,
+            id BIGINT,
+            username VARCHAR(20)
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 10 
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+        """
+    sql """ insert into ${tableName} values('2020-10-01',1,"a");"""
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+        BUILD DEFERRED REFRESH COMPLETE ON MANUAL
+        DISTRIBUTED BY RANDOM BUCKETS 2
+        PROPERTIES ('replication_num' = '1','workload_group' = 'g1')
+        AS 
+        SELECT * FROM ${tableName};
+    """
+    order_qt_create "select MvProperties from mv_infos('database'='${dbName}') 
where Name='${mvName}'"
+    sql """
+            alter MATERIALIZED VIEW ${mvName} set 
('workload_group'='mv_test_not_exist_group');
+        """
+    order_qt_alter "select MvProperties from mv_infos('database'='${dbName}') 
where Name='${mvName}'"
+    sql """
+            refresh MATERIALIZED VIEW ${mvName};
+        """
+    jobName = getJobName(dbName, mvName);
+    logger.info(jobName)
+    waitingMTMVTaskFinishedNotNeedSuccess(jobName)
+    def errors = sql """select ErrorMsg from tasks('type'='mv') where 
MvName='${mvName}' and MvDatabaseName='${dbName}';"""
+    logger.info("errors: " + errors.toString())
+    assertTrue(errors.toString().contains("mv_test_not_exist_group"))
+    sql """
+        DROP MATERIALIZED VIEW ${mvName}
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to