This is an automated email from the ASF dual-hosted git repository.

adonisling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1146bde695 [feature-wip](MTMV) Support refresh mtmv (#16218)
1146bde695 is described below

commit 1146bde695fbe7224ced2910b863d9fa18834ce9
Author: huangzhaowei <[email protected]>
AuthorDate: Sat Feb 4 20:17:45 2023 +0800

    [feature-wip](MTMV) Support refresh mtmv (#16218)
    
    Support using this sql to refresh mtmv manually. It can generate a mtmv 
task right now.
    
    ```
    REFRESH MATERIALIZED VIEW test_mv_view [complete];
    ```
    
    You can use `show mtmv task` to show the latest task.
    
    In this pr, I also try to clear the mtmv tasks when drop the mtmv to make 
sure test suite to be right
---
 .../main/java/org/apache/doris/common/Config.java  |  3 +++
 .../main/java/org/apache/doris/alter/Alter.java    | 14 ++++++++++-
 .../doris/analysis/AlterMaterializedViewStmt.java  |  8 +++++++
 .../java/org/apache/doris/mtmv/MTMVJobManager.java | 13 ++++++++++
 .../org/apache/doris/mtmv/MTMVTaskManager.java     | 19 +++++++++++++++
 .../main/java/org/apache/doris/mtmv/MTMVUtils.java |  2 +-
 .../suites/mtmv_p0/test_create_mtmv.groovy         | 28 +++++++++++++++++++---
 7 files changed, 82 insertions(+), 5 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7e9526d8da..51b4d4073c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1812,6 +1812,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static long scheduler_mtmv_task_expired = 24 * 60 * 60L; // 1day
 
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean keep_scheduler_mtmv_task_when_job_deleted = false;
+
     /**
      * The candidate of the backend node for federation query such as hive 
table and es table query.
      * If the backend of computation role is less than this value, it will 
acquire some mix backend.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 5bb3323d82..3ea07a3579 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -29,6 +29,7 @@ import 
org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt;
 import org.apache.doris.analysis.DropMaterializedViewStmt;
 import org.apache.doris.analysis.DropPartitionClause;
 import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.MVRefreshInfo.RefreshMethod;
 import org.apache.doris.analysis.ModifyColumnCommentClause;
 import org.apache.doris.analysis.ModifyDistributionClause;
 import org.apache.doris.analysis.ModifyEngineClause;
@@ -65,6 +66,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.mtmv.MTMVUtils.TaskSubmitStatus;
 import org.apache.doris.persist.AlterViewInfo;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
 import org.apache.doris.persist.ModifyCommentOperationLog;
@@ -157,7 +159,15 @@ public class Alter {
     }
 
     public void processRefreshMaterializedView(RefreshMaterializedViewStmt 
stmt) throws DdlException {
-        throw new DdlException("Refresh materialized view is not implemented: 
" + stmt.toSql());
+        if (stmt.getRefreshMethod() != RefreshMethod.COMPLETE) {
+            throw new DdlException("Now only support REFRESH COMPLETE.");
+        }
+        String db = stmt.getMvName().getDb();
+        String tbl = stmt.getMvName().getTbl();
+        TaskSubmitStatus status = 
Env.getCurrentEnv().getMTMVJobManager().refreshMTMVTask(db, tbl);
+        if (status != TaskSubmitStatus.SUBMITTED) {
+            throw new DdlException("Refresh MaterializedView with " + 
status.toString());
+        }
     }
 
     private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable 
olapTable, List<AlterClause> alterClauses,
@@ -497,6 +507,8 @@ public class Alter {
     }
 
     public void processAlterMaterializedView(AlterMaterializedViewStmt stmt) 
throws UserException {
+        TableName tbl = stmt.getTable();
+        Env.getCurrentEnv().getInternalCatalog().getDb(tbl.getDb());
         throw new DdlException("ALTER MATERIALIZED VIEW is not implemented: " 
+ stmt.toSql());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java
index a4331d9f5b..a16860cfac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java
@@ -33,6 +33,14 @@ public class AlterMaterializedViewStmt extends DdlStmt  {
         this.info = info;
     }
 
+    public TableName getTable() {
+        return mvName;
+    }
+
+    public MVRefreshInfo getRefreshInfo() {
+        return info;
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws AnalysisException {
         mvName.analyze(analyzer);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index fd9f55ce10..13941c49a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -239,6 +239,16 @@ public class MTMVJobManager {
         return taskManager.killTask(job.getId(), clearPending);
     }
 
+    public MTMVUtils.TaskSubmitStatus refreshMTMVTask(String dbName, String 
mvName) throws DdlException {
+        for (String jobName : nameToJobMap.keySet()) {
+            MTMVJob job = nameToJobMap.get(jobName);
+            if (job.getMVName().equals(mvName) && 
job.getDBName().equals(dbName)) {
+                return submitJobTask(jobName);
+            }
+        }
+        throw new DdlException("No job find for the MaterializedView " + 
dbName + "." + mvName + " .");
+    }
+
     public MTMVUtils.TaskSubmitStatus submitJobTask(String jobName) {
         return submitJobTask(jobName, new MTMVTaskExecuteParams());
     }
@@ -292,6 +302,9 @@ public class MTMVJobManager {
                     periodFutureMap.remove(job.getId());
                 }
                 killJobTask(job.getName(), true);
+                if (!Config.keep_scheduler_mtmv_task_when_job_deleted) {
+                    taskManager.clearTasksByJobName(job.getName(), isReplay);
+                }
                 idToJobMap.remove(job.getId());
                 nameToJobMap.remove(job.getName());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index 55508b18c6..79f15ae4fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -399,6 +399,25 @@ public class MTMVTaskManager {
         }
     }
 
+    public void clearTasksByJobName(String jobName, boolean isReplay) {
+        List<String> clearTasks = Lists.newArrayList();
+
+        if (!tryLock()) {
+            return;
+        }
+        try {
+            Deque<MTMVTask> taskHistory = getAllHistory();
+            for (MTMVTask task : taskHistory) {
+                if (task.getJobName().equals(jobName)) {
+                    clearTasks.add(task.getTaskId());
+                }
+            }
+        } finally {
+            unlock();
+        }
+        dropTasks(clearTasks, isReplay);
+    }
+
     public void removeExpiredTasks() {
         long currentTime = MTMVUtils.getNowTimeStamp();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
index d421d2f50a..ffaf5af888 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
@@ -63,7 +63,7 @@ public class MTMVUtils {
         PENDING, RUNNING, FAILURE, SUCCESS,
     }
 
-    enum TaskSubmitStatus {
+    public enum TaskSubmitStatus {
         SUBMITTED, REJECTED, FAILED
     }
 
diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
index 61534f7758..c506519912 100644
--- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
@@ -66,10 +66,12 @@ suite("test_create_mtmv") {
     def index = show_task_meta.indexOf(['State', 'CHAR'])
     def query = "SHOW MTMV TASK ON ${mvName}"
     def show_task_result
-    def state
+    def state = "PENDING"
     do {
         show_task_result = sql "${query}"
-        state = show_task_result.last().get(index)
+        if (!show_task_result.isEmpty()) {
+            state = show_task_result.last().get(index)
+        }
         println "The state of ${query} is ${state}"
         Thread.sleep(1000);
     } while (state.equals('PENDING') || state.equals('RUNNING'))
@@ -93,9 +95,12 @@ suite("test_create_mtmv") {
         SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, 
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
     """
     // wait task to be finished to avoid task leak in suite.
+    state = "PENDING"
     do {
         show_task_result = sql "${query}"
-        state = show_task_result.last().get(index)
+        if (!show_task_result.isEmpty()) {
+            state = show_task_result.last().get(index)
+        }
         println "The state of ${query} is ${state}"
         Thread.sleep(1000);
     } while (state.equals('PENDING') || state.equals('RUNNING'))
@@ -103,6 +108,23 @@ suite("test_create_mtmv") {
     def show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
     assertEquals 1, show_job_result.size()
 
+    // test REFRESH make sure only define one mv and already run a task.
+    sql """
+        REFRESH MATERIALIZED VIEW ${mvName} COMPLETE
+    """
+    state = "PENDING"
+    do {
+        show_task_result = sql "${query}"
+        if (!show_task_result.isEmpty()) {
+            state = show_task_result.last().get(index)
+        }
+        println "The state of ${query} is ${state}"
+        Thread.sleep(1000);
+    } while (state.equals('PENDING') || state.equals('RUNNING'))
+
+    assertEquals 'SUCCESS', state, show_task_result.last().toString()
+    assertEquals 2, show_task_result.size()
+
     sql """
         DROP MATERIALIZED VIEW ${mvName}
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to