This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 623257d02b1 [feature](mtmv)MTMV pause and resume (#28887)
623257d02b1 is described below
commit 623257d02b1c91edfeffee03192776f93b92b75e
Author: zhangdong <[email protected]>
AuthorDate: Sat Dec 23 14:30:54 2023 +0800
[feature](mtmv)MTMV pause and resume (#28887)
- PAUSE MATERIALIZED VIEW JOB ON mv1
- RESUME MATERIALIZED VIEW JOB ON mv1
- fix when drop db,not drop job
- add lock for one materialized view can only run one task at a time
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +
.../apache/doris/datasource/InternalCatalog.java | 5 ++
.../apache/doris/job/extensions/mtmv/MTMVJob.java | 21 +++++++
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 17 ++++-
.../org/apache/doris/job/task/AbstractTask.java | 8 +++
.../org/apache/doris/mtmv/MTMVHookService.java | 17 ++++-
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 48 ++++++++++-----
.../org/apache/doris/mtmv/MTMVRelationManager.java | 13 ++++
.../java/org/apache/doris/mtmv/MTMVService.java | 21 ++++++-
.../doris/nereids/parser/LogicalPlanBuilder.java | 18 ++++++
.../apache/doris/nereids/trees/plans/PlanType.java | 2 +
.../trees/plans/commands/PauseMTMVCommand.java | 50 +++++++++++++++
.../trees/plans/commands/ResumeMTMVCommand.java | 50 +++++++++++++++
.../trees/plans/commands/info/PauseMTMVInfo.java | 72 ++++++++++++++++++++++
.../trees/plans/commands/info/ResumeMTMVInfo.java | 72 ++++++++++++++++++++++
.../trees/plans/visitor/CommandVisitor.java | 10 +++
regression-test/data/mtmv_p0/test_db_mtmv.out | 7 +++
regression-test/data/mtmv_p0/test_pause_mtmv.out | 10 +++
regression-test/suites/mtmv_p0/test_db_mtmv.groovy | 52 ++++++++++++++++
.../suites/mtmv_p0/test_pause_mtmv.groovy | 60 ++++++++++++++++++
20 files changed, 534 insertions(+), 21 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index f8acaccb10f..abef9ffa6d7 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -99,6 +99,8 @@ statement
| (REFRESH (refreshMethod | refreshTrigger | refreshMethod
refreshTrigger))
| (SET LEFT_PAREN fileProperties=propertyItemList RIGHT_PAREN))
#alterMTMV
| DROP MATERIALIZED VIEW (IF EXISTS)? mvName=multipartIdentifier
#dropMTMV
+ | PAUSE MATERIALIZED VIEW JOB ON mvName=multipartIdentifier #pauseMTMV
+ | RESUME MATERIALIZED VIEW JOB ON mvName=multipartIdentifier
#resumeMTMV
| ALTER TABLE table=relation
ADD CONSTRAINT constraintName=errorCapturingIdentifier
constraint
#addConstraint
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index a5bca6e46a2..dea018b6425 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -519,6 +519,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
}
}
+ for (Table table : tableList) {
+ if (table.getType() == TableType.MATERIALIZED_VIEW) {
+
Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table);
+ }
+ }
unprotectDropDb(db, stmt.isForceDrop(), false, 0);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index 5ee9b43fe1f..c500e693295 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -47,9 +47,12 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
+ private ReentrantReadWriteLock jobRwLock;
+
private static final ShowResultSetMetaData JOB_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("JobId",
ScalarType.createVarchar(20)))
@@ -98,12 +101,14 @@ public class MTMVJob extends AbstractJob<MTMVTask,
MTMVTaskContext> {
private long mtmvId;
public MTMVJob() {
+ jobRwLock = new ReentrantReadWriteLock(true);
}
public MTMVJob(long dbId, long mtmvId) {
this.dbId = dbId;
this.mtmvId = mtmvId;
super.setCreateTimeMs(System.currentTimeMillis());
+ jobRwLock = new ReentrantReadWriteLock(true);
}
@Override
@@ -203,6 +208,22 @@ public class MTMVJob extends AbstractJob<MTMVTask,
MTMVTaskContext> {
return (MTMV) db.getTableOrMetaException(mtmvId,
TableType.MATERIALIZED_VIEW);
}
+ public void readLock() {
+ this.jobRwLock.readLock().lock();
+ }
+
+ public void readUnlock() {
+ this.jobRwLock.readLock().unlock();
+ }
+
+ public void writeLock() {
+ this.jobRwLock.writeLock().lock();
+ }
+
+ public void writeUnlock() {
+ this.jobRwLock.writeLock().unlock();
+ }
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 115d4eba303..ab9b6f9fa94 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -220,6 +220,17 @@ public class MTMVTask extends AbstractTask {
}
}
+ @Override
+ public void runTask() throws JobException {
+ MTMVJob job = (MTMVJob) getJobOrJobException();
+ try {
+ job.writeLock();
+ super.runTask();
+ } finally {
+ job.writeUnlock();
+ }
+ }
+
@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
@@ -276,8 +287,10 @@ public class MTMVTask extends AbstractTask {
}
private void after() {
- Env.getCurrentEnv()
- .addMTMVTaskResult(new
TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
+ if (mtmv != null) {
+ Env.getCurrentEnv()
+ .addMTMVTaskResult(new
TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
+ }
mtmv = null;
relation = null;
executor = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index f887961230f..7327183e95e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -135,4 +135,12 @@ public abstract class AbstractTask implements Task {
return job == null ? "" : job.getJobName();
}
+ public Job getJobOrJobException() throws JobException {
+ AbstractJob job = Env.getCurrentEnv().getJobManager().getJob(jobId);
+ if (job == null) {
+ throw new JobException("job not exist, jobId:" + jobId);
+ }
+ return job;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java
index 63ae15fef1b..41bc506f5c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java
@@ -21,8 +21,11 @@ import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.persist.AlterMTMV;
/**
@@ -77,7 +80,7 @@ public interface MTMVHookService {
* @throws DdlException
* @throws MetaNotFoundException
*/
- void refreshMTMV(RefreshMTMVInfo info) throws DdlException,
MetaNotFoundException;
+ void refreshMTMV(RefreshMTMVInfo info) throws DdlException,
MetaNotFoundException, JobException;
/**
* triggered when mtmv task finish
@@ -101,4 +104,16 @@ public interface MTMVHookService {
* @param table
*/
void alterTable(Table table);
+
+ /**
+ * Triggered when pause mtmv
+ * @param info
+ */
+ void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException,
DdlException, JobException;
+
+ /**
+ * Triggered when resume mtmv
+ * @param info
+ */
+ void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException,
DdlException, JobException;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index bdbc3231181..8cf225c59c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -37,7 +37,10 @@ import
org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
+import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.qe.ConnectContext;
@@ -167,23 +170,11 @@ public class MTMVJobManager implements MTMVHookService {
* @throws MetaNotFoundException
*/
@Override
- public void refreshMTMV(RefreshMTMVInfo info) throws DdlException,
MetaNotFoundException {
- Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(info.getMvName().getDb());
- MTMV mtmv = (MTMV)
db.getTableOrMetaException(info.getMvName().getTbl(),
TableType.MATERIALIZED_VIEW);
- List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
- .queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
- if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
- throw new DdlException("jobs not normal,should have one job,but
job num is: " + jobs.size());
- }
- try {
- MTMVTaskContext mtmvTaskContext = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
- info.isComplete());
-
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(),
mtmvTaskContext);
-
- } catch (JobException e) {
- e.printStackTrace();
- throw new DdlException(e.getMessage());
- }
+ public void refreshMTMV(RefreshMTMVInfo info) throws DdlException,
MetaNotFoundException, JobException {
+ MTMVJob job = getJobByTableNameInfo(info.getMvName());
+ MTMVTaskContext mtmvTaskContext = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
+ info.isComplete());
+ Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(),
mtmvTaskContext);
}
@Override
@@ -201,4 +192,27 @@ public class MTMVJobManager implements MTMVHookService {
}
+ @Override
+ public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException,
DdlException, JobException {
+ MTMVJob job = getJobByTableNameInfo(info.getMvName());
+ Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(),
JobStatus.PAUSED);
+ }
+
+ @Override
+ public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException,
DdlException, JobException {
+ MTMVJob job = getJobByTableNameInfo(info.getMvName());
+ Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(),
JobStatus.RUNNING);
+ }
+
+ private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws
DdlException, MetaNotFoundException {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb());
+ MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(),
TableType.MATERIALIZED_VIEW);
+ List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
+ .queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
+ if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
+ throw new DdlException("jobs not normal,should have one job,but
job num is: " + jobs.size());
+ }
+ return jobs.get(0);
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
index 11da5104255..7be43771e44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
@@ -24,9 +24,12 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
@@ -187,6 +190,16 @@ public class MTMVRelationManager implements
MTMVHookService {
processBaseTableChange(table, "The base table has been updated:");
}
+ @Override
+ public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException,
DdlException, JobException {
+
+ }
+
+ @Override
+ public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException,
DdlException, JobException {
+
+ }
+
private void processBaseTableChange(Table table, String msgPrefix) {
BaseTableInfo baseTableInfo = new BaseTableInfo(table);
Set<BaseTableInfo> mtmvsByBaseTable =
getMtmvsByBaseTable(baseTableInfo);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
index 9530467dee7..3ab05c92a46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
@@ -23,9 +23,12 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.persist.AlterMTMV;
import com.google.common.collect.Maps;
@@ -108,7 +111,7 @@ public class MTMVService {
}
}
- public void refreshMTMV(RefreshMTMVInfo info) throws DdlException,
MetaNotFoundException {
+ public void refreshMTMV(RefreshMTMVInfo info) throws DdlException,
MetaNotFoundException, JobException {
Objects.requireNonNull(info);
LOG.info("refreshMTMV, RefreshMTMVInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
@@ -140,4 +143,20 @@ public class MTMVService {
mtmvHookService.refreshComplete(mtmv, cache, task);
}
}
+
+ public void pauseMTMV(PauseMTMVInfo info) throws DdlException,
MetaNotFoundException, JobException {
+ Objects.requireNonNull(info);
+ LOG.info("pauseMTMV, PauseMTMVInfo: {}", info);
+ for (MTMVHookService mtmvHookService : hooks.values()) {
+ mtmvHookService.pauseMTMV(info);
+ }
+ }
+
+ public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException,
DdlException, JobException {
+ Objects.requireNonNull(info);
+ LOG.info("resumeMTMV, ResumeMTMVInfo: {}", info);
+ for (MTMVHookService mtmvHookService : hooks.values()) {
+ mtmvHookService.resumeMTMV(info);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index d230fa85d9c..dcbfb9ef3f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -123,6 +123,7 @@ import
org.apache.doris.nereids.DorisParser.ParenthesizedExpressionContext;
import org.apache.doris.nereids.DorisParser.PartitionSpecContext;
import org.apache.doris.nereids.DorisParser.PartitionValueDefContext;
import org.apache.doris.nereids.DorisParser.PartitionsDefContext;
+import org.apache.doris.nereids.DorisParser.PauseMTMVContext;
import org.apache.doris.nereids.DorisParser.PlanTypeContext;
import org.apache.doris.nereids.DorisParser.PredicateContext;
import org.apache.doris.nereids.DorisParser.PredicatedContext;
@@ -142,6 +143,7 @@ import
org.apache.doris.nereids.DorisParser.RefreshScheduleContext;
import org.apache.doris.nereids.DorisParser.RefreshTriggerContext;
import org.apache.doris.nereids.DorisParser.RegularQuerySpecificationContext;
import org.apache.doris.nereids.DorisParser.RelationContext;
+import org.apache.doris.nereids.DorisParser.ResumeMTMVContext;
import org.apache.doris.nereids.DorisParser.RollupDefContext;
import org.apache.doris.nereids.DorisParser.RollupDefsContext;
import org.apache.doris.nereids.DorisParser.RowConstructorContext;
@@ -339,7 +341,9 @@ import
org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
+import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
+import org.apache.doris.nereids.trees.plans.commands.ResumeMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVInfo;
import
org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
@@ -360,7 +364,9 @@ import
org.apache.doris.nereids.trees.plans.commands.info.IndexDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition;
import
org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition.MaxValue;
+import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RollupDefinition;
import
org.apache.doris.nereids.trees.plans.commands.info.SimpleColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.StepPartition;
@@ -688,6 +694,18 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return new DropMTMVCommand(new DropMTMVInfo(new
TableNameInfo(nameParts), ctx.EXISTS() != null));
}
+ @Override
+ public PauseMTMVCommand visitPauseMTMV(PauseMTMVContext ctx) {
+ List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
+ return new PauseMTMVCommand(new PauseMTMVInfo(new
TableNameInfo(nameParts)));
+ }
+
+ @Override
+ public ResumeMTMVCommand visitResumeMTMV(ResumeMTMVContext ctx) {
+ List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
+ return new ResumeMTMVCommand(new ResumeMTMVInfo(new
TableNameInfo(nameParts)));
+ }
+
@Override
public AlterMTMVCommand visitAlterMTMV(AlterMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index d75115a648f..3db4a438345 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -133,5 +133,7 @@ public enum PlanType {
DROP_CONSTRAINT_COMMAND,
REFRESH_MTMV_COMMAND,
DROP_MTMV_COMMAND,
+ PAUSE_MTMV_COMMAND,
+ RESUME_MTMV_COMMAND,
CALL_COMMAND
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseMTMVCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseMTMVCommand.java
new file mode 100644
index 00000000000..f2a8cb23360
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseMTMVCommand.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import java.util.Objects;
+
+/**
+ * pause mtmv
+ */
+public class PauseMTMVCommand extends Command implements ForwardWithSync,
NotAllowFallback {
+ private final PauseMTMVInfo pauseMTMVInfo;
+
+ public PauseMTMVCommand(PauseMTMVInfo pauseMTMVInfo) {
+ super(PlanType.PAUSE_MTMV_COMMAND);
+ this.pauseMTMVInfo = Objects.requireNonNull(pauseMTMVInfo, "require
pauseMTMVInfo object");
+ }
+
+ @Override
+ public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ pauseMTMVInfo.analyze(ctx);
+ Env.getCurrentEnv().getMtmvService().pauseMTMV(pauseMTMVInfo);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPauseMTMVCommand(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeMTMVCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeMTMVCommand.java
new file mode 100644
index 00000000000..a47c65eeb41
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeMTMVCommand.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import java.util.Objects;
+
+/**
+ * resume mtmv
+ */
+public class ResumeMTMVCommand extends Command implements ForwardWithSync,
NotAllowFallback {
+ private final ResumeMTMVInfo resumeMTMVInfo;
+
+ public ResumeMTMVCommand(ResumeMTMVInfo resumeMTMVInfo) {
+ super(PlanType.RESUME_MTMV_COMMAND);
+ this.resumeMTMVInfo = Objects.requireNonNull(resumeMTMVInfo, "require
resumeMTMVInfo object");
+ }
+
+ @Override
+ public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ resumeMTMVInfo.analyze(ctx);
+ Env.getCurrentEnv().getMtmvService().resumeMTMV(resumeMTMVInfo);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitResumeMTMVCommand(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java
new file mode 100644
index 00000000000..15bbcff6f72
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Objects;
+
+/**
+ * pause mtmv info
+ */
+public class PauseMTMVInfo {
+ private final TableNameInfo mvName;
+
+ public PauseMTMVInfo(TableNameInfo mvName) {
+ this.mvName = Objects.requireNonNull(mvName, "require mvName object");
+ }
+
+ /**
+ * analyze pause info
+ *
+ * @param ctx ConnectContext
+ */
+ public void analyze(ConnectContext ctx) {
+ mvName.analyze(ctx);
+ if
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
mvName.getDb(),
+ mvName.getTbl(), PrivPredicate.CREATE)) {
+ String message =
ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE",
+ ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
+ mvName.getDb() + ": " + mvName.getTbl());
+ throw new AnalysisException(message);
+ }
+ try {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
+ db.getTableOrMetaException(mvName.getTbl(),
TableType.MATERIALIZED_VIEW);
+ } catch (MetaNotFoundException | DdlException e) {
+ throw new AnalysisException(e.getMessage());
+ }
+ }
+
+ /**
+ * getMvName
+ *
+ * @return TableNameInfo
+ */
+ public TableNameInfo getMvName() {
+ return mvName;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java
new file mode 100644
index 00000000000..a7f23bedfc7
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Objects;
+
+/**
+ * resume mtmv info
+ */
+public class ResumeMTMVInfo {
+ private final TableNameInfo mvName;
+
+ public ResumeMTMVInfo(TableNameInfo mvName) {
+ this.mvName = Objects.requireNonNull(mvName, "require mvName object");
+ }
+
+ /**
+ * analyze resume info
+ *
+ * @param ctx ConnectContext
+ */
+ public void analyze(ConnectContext ctx) {
+ mvName.analyze(ctx);
+ if
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
mvName.getDb(),
+ mvName.getTbl(), PrivPredicate.CREATE)) {
+ String message =
ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE",
+ ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
+ mvName.getDb() + ": " + mvName.getTbl());
+ throw new AnalysisException(message);
+ }
+ try {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
+ db.getTableOrMetaException(mvName.getTbl(),
TableType.MATERIALIZED_VIEW);
+ } catch (MetaNotFoundException | DdlException e) {
+ throw new AnalysisException(e.getMessage());
+ }
+ }
+
+ /**
+ * getMvName
+ *
+ * @return TableNameInfo
+ */
+ public TableNameInfo getMvName() {
+ return mvName;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index e67b78bb4b1..a48a8aaf981 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -34,7 +34,9 @@ import
org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
+import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
+import org.apache.doris.nereids.trees.plans.commands.ResumeMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
/** CommandVisitor. */
@@ -113,6 +115,14 @@ public interface CommandVisitor<R, C> {
return visitCommand(dropMTMVCommand, context);
}
+ default R visitPauseMTMVCommand(PauseMTMVCommand pauseMTMVCommand, C
context) {
+ return visitCommand(pauseMTMVCommand, context);
+ }
+
+ default R visitResumeMTMVCommand(ResumeMTMVCommand resumeMTMVCommand, C
context) {
+ return visitCommand(resumeMTMVCommand, context);
+ }
+
default R visitCallCommand(CallCommand callCommand, C context) {
return visitCommand(callCommand, context);
}
diff --git a/regression-test/data/mtmv_p0/test_db_mtmv.out
b/regression-test/data/mtmv_p0/test_db_mtmv.out
new file mode 100644
index 00000000000..ac76df5cf04
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_db_mtmv.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !count_init --
+1
+
+-- !count_dropped --
+0
+
diff --git a/regression-test/data/mtmv_p0/test_pause_mtmv.out
b/regression-test/data/mtmv_p0/test_pause_mtmv.out
new file mode 100644
index 00000000000..bf257d83508
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_pause_mtmv.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !status_init --
+RUNNING
+
+-- !status_pause --
+PAUSED
+
+-- !status_resume --
+RUNNING
+
diff --git a/regression-test/suites/mtmv_p0/test_db_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_db_mtmv.groovy
new file mode 100644
index 00000000000..fa19c2f7f97
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_db_mtmv.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_db_mtmv") {
+ def tableName = "t_test_db_mtmv_user"
+ def mvName = "multi_mv_test_db_mtmv"
+ def dbName = "regression_test_mtmv_db"
+ sql """drop database if exists `${dbName}`"""
+ sql """create database `${dbName}`"""
+ sql """use `${dbName}`"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS `${tableName}` (
+ event_day DATE,
+ id BIGINT,
+ username VARCHAR(20)
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 10
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH COMPLETE ON MANUAL
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${tableName};
+ """
+ def jobName = getJobName(dbName, mvName);
+ order_qt_count_init "select count(*) from jobs('type'='mv') where
Name='${jobName}'"
+ sql """
+ drop database `${dbName}`
+ """
+ order_qt_count_dropped "select count(*) from jobs('type'='mv') where
Name='${jobName}'"
+}
diff --git a/regression-test/suites/mtmv_p0/test_pause_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_pause_mtmv.groovy
new file mode 100644
index 00000000000..dcd9d56ff77
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_pause_mtmv.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pause_mtmv") {
+ def tableName = "t_test_pause_mtmv_user"
+ def mvName = "multi_mv_test_pause_mtmv"
+ def dbName = "regression_test_mtmv_p0"
+ sql """drop table if exists `${tableName}`"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS `${tableName}` (
+ event_day DATE,
+ id BIGINT,
+ username VARCHAR(20)
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 10
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """drop materialized view if exists ${mvName};"""
+
+ // IMMEDIATE MANUAL
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH COMPLETE ON MANUAL
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${tableName};
+ """
+ def jobName = getJobName("regression_test_mtmv_p0", mvName);
+ order_qt_status_init "select Status from jobs('type'='mv') where
Name='${jobName}'"
+ sql """
+ PAUSE MATERIALIZED VIEW JOB ON ${mvName}
+ """
+ order_qt_status_pause "select Status from jobs('type'='mv') where
Name='${jobName}'"
+ sql """
+ RESUME MATERIALIZED VIEW JOB ON ${mvName}
+ """
+ order_qt_status_resume "select Status from jobs('type'='mv') where
Name='${jobName}'"
+ sql """
+ DROP MATERIALIZED VIEW ${mvName}
+ """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]