This is an automated email from the ASF dual-hosted git repository.
adonisling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1146bde695 [feature-wip](MTMV) Support refresh mtmv (#16218)
1146bde695 is described below
commit 1146bde695fbe7224ced2910b863d9fa18834ce9
Author: huangzhaowei <[email protected]>
AuthorDate: Sat Feb 4 20:17:45 2023 +0800
[feature-wip](MTMV) Support refresh mtmv (#16218)
Support using this sql to refresh mtmv manually. It can generate a mtmv
task right now.
```
REFRESH MATERIALIZED VIEW test_mv_view [complete];
```
You can use `show mtmv task` to show the latest task.
In this pr, I also try to clear the mtmv tasks when drop the mtmv to make
sure test suite to be right
---
.../main/java/org/apache/doris/common/Config.java | 3 +++
.../main/java/org/apache/doris/alter/Alter.java | 14 ++++++++++-
.../doris/analysis/AlterMaterializedViewStmt.java | 8 +++++++
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 13 ++++++++++
.../org/apache/doris/mtmv/MTMVTaskManager.java | 19 +++++++++++++++
.../main/java/org/apache/doris/mtmv/MTMVUtils.java | 2 +-
.../suites/mtmv_p0/test_create_mtmv.groovy | 28 +++++++++++++++++++---
7 files changed, 82 insertions(+), 5 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7e9526d8da..51b4d4073c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1812,6 +1812,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long scheduler_mtmv_task_expired = 24 * 60 * 60L; // 1day
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean keep_scheduler_mtmv_task_when_job_deleted = false;
+
/**
* The candidate of the backend node for federation query such as hive
table and es table query.
* If the backend of computation role is less than this value, it will
acquire some mix backend.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 5bb3323d82..3ea07a3579 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -29,6 +29,7 @@ import
org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.MVRefreshInfo.RefreshMethod;
import org.apache.doris.analysis.ModifyColumnCommentClause;
import org.apache.doris.analysis.ModifyDistributionClause;
import org.apache.doris.analysis.ModifyEngineClause;
@@ -65,6 +66,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.mtmv.MTMVUtils.TaskSubmitStatus;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
@@ -157,7 +159,15 @@ public class Alter {
}
public void processRefreshMaterializedView(RefreshMaterializedViewStmt
stmt) throws DdlException {
- throw new DdlException("Refresh materialized view is not implemented:
" + stmt.toSql());
+ if (stmt.getRefreshMethod() != RefreshMethod.COMPLETE) {
+ throw new DdlException("Now only support REFRESH COMPLETE.");
+ }
+ String db = stmt.getMvName().getDb();
+ String tbl = stmt.getMvName().getTbl();
+ TaskSubmitStatus status =
Env.getCurrentEnv().getMTMVJobManager().refreshMTMVTask(db, tbl);
+ if (status != TaskSubmitStatus.SUBMITTED) {
+ throw new DdlException("Refresh MaterializedView with " +
status.toString());
+ }
}
private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable
olapTable, List<AlterClause> alterClauses,
@@ -497,6 +507,8 @@ public class Alter {
}
public void processAlterMaterializedView(AlterMaterializedViewStmt stmt)
throws UserException {
+ TableName tbl = stmt.getTable();
+ Env.getCurrentEnv().getInternalCatalog().getDb(tbl.getDb());
throw new DdlException("ALTER MATERIALIZED VIEW is not implemented: "
+ stmt.toSql());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java
index a4331d9f5b..a16860cfac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java
@@ -33,6 +33,14 @@ public class AlterMaterializedViewStmt extends DdlStmt {
this.info = info;
}
+ public TableName getTable() {
+ return mvName;
+ }
+
+ public MVRefreshInfo getRefreshInfo() {
+ return info;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
mvName.analyze(analyzer);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index fd9f55ce10..13941c49a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -239,6 +239,16 @@ public class MTMVJobManager {
return taskManager.killTask(job.getId(), clearPending);
}
+ public MTMVUtils.TaskSubmitStatus refreshMTMVTask(String dbName, String
mvName) throws DdlException {
+ for (String jobName : nameToJobMap.keySet()) {
+ MTMVJob job = nameToJobMap.get(jobName);
+ if (job.getMVName().equals(mvName) &&
job.getDBName().equals(dbName)) {
+ return submitJobTask(jobName);
+ }
+ }
+ throw new DdlException("No job find for the MaterializedView " +
dbName + "." + mvName + " .");
+ }
+
public MTMVUtils.TaskSubmitStatus submitJobTask(String jobName) {
return submitJobTask(jobName, new MTMVTaskExecuteParams());
}
@@ -292,6 +302,9 @@ public class MTMVJobManager {
periodFutureMap.remove(job.getId());
}
killJobTask(job.getName(), true);
+ if (!Config.keep_scheduler_mtmv_task_when_job_deleted) {
+ taskManager.clearTasksByJobName(job.getName(), isReplay);
+ }
idToJobMap.remove(job.getId());
nameToJobMap.remove(job.getName());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index 55508b18c6..79f15ae4fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -399,6 +399,25 @@ public class MTMVTaskManager {
}
}
+ public void clearTasksByJobName(String jobName, boolean isReplay) {
+ List<String> clearTasks = Lists.newArrayList();
+
+ if (!tryLock()) {
+ return;
+ }
+ try {
+ Deque<MTMVTask> taskHistory = getAllHistory();
+ for (MTMVTask task : taskHistory) {
+ if (task.getJobName().equals(jobName)) {
+ clearTasks.add(task.getTaskId());
+ }
+ }
+ } finally {
+ unlock();
+ }
+ dropTasks(clearTasks, isReplay);
+ }
+
public void removeExpiredTasks() {
long currentTime = MTMVUtils.getNowTimeStamp();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
index d421d2f50a..ffaf5af888 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
@@ -63,7 +63,7 @@ public class MTMVUtils {
PENDING, RUNNING, FAILURE, SUCCESS,
}
- enum TaskSubmitStatus {
+ public enum TaskSubmitStatus {
SUBMITTED, REJECTED, FAILED
}
diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
index 61534f7758..c506519912 100644
--- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
@@ -66,10 +66,12 @@ suite("test_create_mtmv") {
def index = show_task_meta.indexOf(['State', 'CHAR'])
def query = "SHOW MTMV TASK ON ${mvName}"
def show_task_result
- def state
+ def state = "PENDING"
do {
show_task_result = sql "${query}"
- state = show_task_result.last().get(index)
+ if (!show_task_result.isEmpty()) {
+ state = show_task_result.last().get(index)
+ }
println "The state of ${query} is ${state}"
Thread.sleep(1000);
} while (state.equals('PENDING') || state.equals('RUNNING'))
@@ -93,9 +95,12 @@ suite("test_create_mtmv") {
SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName},
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
"""
// wait task to be finished to avoid task leak in suite.
+ state = "PENDING"
do {
show_task_result = sql "${query}"
- state = show_task_result.last().get(index)
+ if (!show_task_result.isEmpty()) {
+ state = show_task_result.last().get(index)
+ }
println "The state of ${query} is ${state}"
Thread.sleep(1000);
} while (state.equals('PENDING') || state.equals('RUNNING'))
@@ -103,6 +108,23 @@ suite("test_create_mtmv") {
def show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
assertEquals 1, show_job_result.size()
+ // test REFRESH make sure only define one mv and already run a task.
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} COMPLETE
+ """
+ state = "PENDING"
+ do {
+ show_task_result = sql "${query}"
+ if (!show_task_result.isEmpty()) {
+ state = show_task_result.last().get(index)
+ }
+ println "The state of ${query} is ${state}"
+ Thread.sleep(1000);
+ } while (state.equals('PENDING') || state.equals('RUNNING'))
+
+ assertEquals 'SUCCESS', state, show_task_result.last().toString()
+ assertEquals 2, show_task_result.size()
+
sql """
DROP MATERIALIZED VIEW ${mvName}
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]