This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0f3c5442605 [feature](mtmv)mtmv support partition (#28144)
0f3c5442605 is described below
commit 0f3c5442605305aae0e50bc4ffe7c66fdd33be5a
Author: zhangdong <[email protected]>
AuthorDate: Sun Dec 17 18:28:03 2023 +0800
[feature](mtmv)mtmv support partition (#28144)
- create MTMV support partition and `AUTO` refresh method
- refresh mtmv support support specified partitions
- MTMV support incremental updates
- add property `EXCLUDED_TRIGGER_TABLES` for mv
- Maintain MTMVCache after successful task refresh for plan
rewrite(MTMV.getOrGenerateCache)
- show partitions add "SyncWithBaseTables"
- drop job before drop MTMV
- task tvf add
"MvId,MvDatabaseId,ErrorMsg,TaskContext,RefreshMode,RefreshPartitions"
- add `NotAllowFallback` for mtmv not fallback to old planner
- add `MTMVUtils.getMTMVCanRewritePartitions() `and
`Env.getCurrentEnv().getMtmvService().getRelationManager().getAvailableMTMVs()`
for plan rewrite
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 11 +-
.../org/apache/doris/analysis/CreateMTMVStmt.java | 18 +-
.../main/java/org/apache/doris/catalog/MTMV.java | 56 ++-
.../java/org/apache/doris/catalog/OlapTable.java | 22 +
.../org/apache/doris/catalog/OlapTableFactory.java | 22 +
.../java/org/apache/doris/catalog/Partition.java | 12 +
.../org/apache/doris/catalog/PartitionInfo.java | 11 +
.../doris/common/proc/PartitionsProcDir.java | 20 +-
.../apache/doris/common/util/PropertyAnalyzer.java | 1 +
.../apache/doris/datasource/InternalCatalog.java | 6 +-
.../apache/doris/job/extensions/mtmv/MTMVJob.java | 35 +-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 175 ++++++--
.../doris/job/extensions/mtmv/MTMVTaskContext.java | 58 +++
.../org/apache/doris/job/task/AbstractTask.java | 1 +
.../java/org/apache/doris/mtmv/BaseTableInfo.java | 14 +
.../doris/mtmv/{MVCache.java => MTMVCache.java} | 10 +-
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 9 +-
.../org/apache/doris/mtmv/MTMVPartitionInfo.java | 110 +++++
.../java/org/apache/doris/mtmv/MTMVPlanUtil.java | 108 +++++
.../org/apache/doris/mtmv/MTMVRefreshEnum.java | 3 +-
...VCacheManager.java => MTMVRelationManager.java} | 150 +------
.../java/org/apache/doris/mtmv/MTMVService.java | 17 +-
.../main/java/org/apache/doris/mtmv/MTMVUtil.java | 489 +++++++++++++++++++++
.../doris/nereids/parser/LogicalPlanBuilder.java | 58 ++-
.../mv/AbstractMaterializedViewRule.java | 2 +-
.../mv/InitMaterializationContextHook.java | 6 +-
.../exploration/mv/MaterializationContext.java | 14 +-
.../trees/plans/commands/AlterMTMVCommand.java | 2 +-
.../trees/plans/commands/CreateMTMVCommand.java | 2 +-
.../trees/plans/commands/DropMTMVCommand.java | 2 +-
.../trees/plans/commands/NotAllowFallback.java} | 48 +-
.../trees/plans/commands/RefreshMTMVCommand.java | 2 +-
.../plans/commands/UpdateMvByPartitionCommand.java | 47 +-
.../trees/plans/commands/info/CreateMTMVInfo.java | 111 ++++-
.../trees/plans/commands/info/RefreshMTMVInfo.java | 45 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 13 +
.../doris/tablefunction/MetadataGenerator.java | 3 +
.../tablefunction/MvInfosTableValuedFunction.java | 4 +-
38 files changed, 1410 insertions(+), 307 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 674e4550976..be683f5ee4b 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -90,10 +90,11 @@ statement
(REFRESH refreshMethod? refreshTrigger?)?
(KEY keys=identifierList)?
(COMMENT STRING_LITERAL)?
+ (PARTITION BY LEFT_PAREN partitionKey = identifier RIGHT_PAREN)?
(DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM) (BUCKETS
(INTEGER_VALUE | AUTO))?)?
propertyClause?
AS query
#createMTMV
- | REFRESH MATERIALIZED VIEW mvName=multipartIdentifier
#refreshMTMV
+ | REFRESH MATERIALIZED VIEW mvName=multipartIdentifier (partitionSpec |
COMPLETE)? #refreshMTMV
| ALTER MATERIALIZED VIEW mvName=multipartIdentifier ((RENAME
newName=identifier)
| (REFRESH (refreshMethod | refreshTrigger | refreshMethod
refreshTrigger))
| (SET LEFT_PAREN fileProperties=propertyItemList RIGHT_PAREN))
#alterMTMV
@@ -158,15 +159,11 @@ refreshTrigger
;
refreshSchedule
- : EVERY INTEGER_VALUE mvRefreshUnit (STARTS STRING_LITERAL)?
- ;
-
-mvRefreshUnit
- : SECOND | MINUTE | HOUR | DAY | WEEK
+ : EVERY INTEGER_VALUE refreshUnit = identifier (STARTS STRING_LITERAL)?
;
refreshMethod
- : COMPLETE
+ : COMPLETE | AUTO
;
identifierOrText
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
index bb2efdd6281..9421bb047c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
@@ -20,7 +20,9 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Index;
import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRelation;
import java.util.ArrayList;
import java.util.List;
@@ -31,17 +33,21 @@ public class CreateMTMVStmt extends CreateTableStmt {
private final String querySql;
private final EnvInfo envInfo;
private Map<String, String> mvProperties;
+ private MTMVPartitionInfo mvPartitionInfo;
+ private MTMVRelation relation;
public CreateMTMVStmt(boolean ifNotExists, TableName mvName, List<Column>
columns,
MTMVRefreshInfo refreshInfo, KeysDesc keyDesc, DistributionDesc
distributionDesc,
Map<String, String> properties, Map<String, String> mvProperties,
String querySql, String comment,
- EnvInfo envInfo) {
- super(ifNotExists, false, mvName, columns, new ArrayList<Index>(),
DEFAULT_ENGINE_NAME, keyDesc, null,
+ EnvInfo envInfo, PartitionDesc partitionDesc, MTMVPartitionInfo
mvPartitionInfo, MTMVRelation relation) {
+ super(ifNotExists, false, mvName, columns, new ArrayList<Index>(),
DEFAULT_ENGINE_NAME, keyDesc, partitionDesc,
distributionDesc, properties, null, comment, null, null);
this.refreshInfo = refreshInfo;
this.querySql = querySql;
this.envInfo = envInfo;
this.mvProperties = mvProperties;
+ this.mvPartitionInfo = mvPartitionInfo;
+ this.relation = relation;
}
public MTMVRefreshInfo getRefreshInfo() {
@@ -59,4 +65,12 @@ public class CreateMTMVStmt extends CreateTableStmt {
public Map<String, String> getMvProperties() {
return mvProperties;
}
+
+ public MTMVPartitionInfo getMvPartitionInfo() {
+ return mvPartitionInfo;
+ }
+
+ public MTMVRelation getRelation() {
+ return relation;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
index af0691d94fb..a2c87581fc8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
@@ -18,21 +18,25 @@
package org.apache.doris.catalog;
import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVJobInfo;
import org.apache.doris.mtmv.MTMVJobManager;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
-import org.apache.doris.mtmv.MVCache;
import org.apache.doris.persist.gson.GsonUtils;
+import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -41,6 +45,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -62,8 +67,10 @@ public class MTMV extends OlapTable {
private Map<String, String> mvProperties;
@SerializedName("r")
private MTMVRelation relation;
- // Should update after every fresh
- private MVCache mvCache;
+ @SerializedName("mpi")
+ private MTMVPartitionInfo mvPartitionInfo;
+ // Should update after every fresh, not persist
+ private MTMVCache cache;
// For deserialization
public MTMV() {
@@ -87,6 +94,8 @@ public class MTMV extends OlapTable {
this.status = new MTMVStatus();
this.jobInfo = new MTMVJobInfo(MTMVJobManager.MTMV_JOB_PREFIX +
params.tableId);
this.mvProperties = params.mvProperties;
+ this.mvPartitionInfo = params.mvPartitionInfo;
+ this.relation = params.relation;
mvRwLock = new ReentrantReadWriteLock(true);
}
@@ -119,12 +128,12 @@ public class MTMV extends OlapTable {
return relation;
}
- public MVCache getMvCache() {
- return mvCache;
+ public MTMVCache getCache() {
+ return cache;
}
- public void setMvCache(MVCache mvCache) {
- this.mvCache = mvCache;
+ public void setCache(MTMVCache cache) {
+ this.cache = cache;
}
public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) {
@@ -148,6 +157,12 @@ public class MTMV extends OlapTable {
this.status.setSchemaChangeDetail(null);
this.status.setRefreshState(MTMVRefreshState.SUCCESS);
this.relation = relation;
+ try {
+ this.cache = MTMVCache.from(this,
MTMVPlanUtil.createMTMVContext(this));
+ } catch (Throwable e) {
+ this.cache = null;
+ LOG.warn("generate cache failed", e);
+ }
} else {
this.status.setRefreshState(MTMVRefreshState.FAIL);
}
@@ -170,10 +185,36 @@ public class MTMV extends OlapTable {
}
}
+ public Set<String> getExcludedTriggerTables() {
+ if
(!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES))
{
+ return Sets.newHashSet();
+ }
+ String[] split =
mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
+ return Sets.newHashSet(split);
+ }
+
+ public MTMVCache getOrGenerateCache() throws AnalysisException {
+ if (cache == null) {
+ writeMvLock();
+ try {
+ if (cache == null) {
+ this.cache = MTMVCache.from(this,
MTMVPlanUtil.createMTMVContext(this));
+ }
+ } finally {
+ writeMvUnlock();
+ }
+ }
+ return cache;
+ }
+
public Map<String, String> getMvProperties() {
return mvProperties;
}
+ public MTMVPartitionInfo getMvPartitionInfo() {
+ return mvPartitionInfo;
+ }
+
public void readMvLock() {
this.mvRwLock.readLock().lock();
}
@@ -207,6 +248,7 @@ public class MTMV extends OlapTable {
jobInfo = materializedView.jobInfo;
mvProperties = materializedView.mvProperties;
relation = materializedView.relation;
+ mvPartitionInfo = materializedView.mvPartitionInfo;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 05a12ed387e..0ce2df5e6f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -947,6 +947,17 @@ public class OlapTable extends Table {
return getPartition(partitionName, true);
}
+ public Partition getPartitionOrAnalysisException(String partitionName)
throws AnalysisException {
+ Partition partition = getPartition(partitionName, false);
+ if (partition == null) {
+ partition = getPartition(partitionName, true);
+ }
+ if (partition == null) {
+ throw new AnalysisException("partition not found: " +
partitionName);
+ }
+ return partition;
+ }
+
// get partition by name
public Partition getPartition(String partitionName, boolean
isTempPartition) {
if (isTempPartition) {
@@ -965,6 +976,17 @@ public class OlapTable extends Table {
return partition;
}
+ public Partition getPartitionOrAnalysisException(long partitionId) throws
AnalysisException {
+ Partition partition = idToPartition.get(partitionId);
+ if (partition == null) {
+ partition = tempPartitions.getPartition(partitionId);
+ }
+ if (partition == null) {
+ throw new AnalysisException("partition not found: " + partitionId);
+ }
+ return partition;
+ }
+
// select the non-empty partition ids belonging to this table.
//
// ATTN: partitions not belonging to this table will be filtered.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
index af87ec47991..7d11ed7bdd9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
@@ -22,7 +22,9 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRelation;
import com.google.common.base.Preconditions;
@@ -49,6 +51,8 @@ public class OlapTableFactory {
public EnvInfo envInfo;
public String querySql;
public Map<String, String> mvProperties;
+ public MTMVPartitionInfo mvPartitionInfo;
+ public MTMVRelation relation;
}
private BuildParams params;
@@ -158,6 +162,22 @@ public class OlapTableFactory {
return this;
}
+ private OlapTableFactory withMvPartitionInfo(MTMVPartitionInfo
mvPartitionInfo) {
+ Preconditions.checkState(params instanceof MTMVParams, "Invalid
argument for "
+ + params.getClass().getSimpleName());
+ MTMVParams mtmvParams = (MTMVParams) params;
+ mtmvParams.mvPartitionInfo = mvPartitionInfo;
+ return this;
+ }
+
+ private OlapTableFactory withMvRelation(MTMVRelation relation) {
+ Preconditions.checkState(params instanceof MTMVParams, "Invalid
argument for "
+ + params.getClass().getSimpleName());
+ MTMVParams mtmvParams = (MTMVParams) params;
+ mtmvParams.relation = relation;
+ return this;
+ }
+
public OlapTableFactory withExtraParams(DdlStmt stmt) {
boolean isMaterializedView = stmt instanceof CreateMTMVStmt;
if (!isMaterializedView) {
@@ -168,6 +188,8 @@ public class OlapTableFactory {
return withRefreshInfo(createMTMVStmt.getRefreshInfo())
.withQuerySql(createMTMVStmt.getQuerySql())
.withMvProperties(createMTMVStmt.getMvProperties())
+ .withMvPartitionInfo(createMTMVStmt.getMvPartitionInfo())
+ .withMvRelation(createMTMVStmt.getRelation())
.withEnvInfo(createMTMVStmt.getEnvInfo());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 3a1905bd8e7..76c3097a033 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -168,6 +168,18 @@ public class Partition extends MetaObject implements
Writable {
return visibleVersionTime;
}
+ /**
+ * if visibleVersion is 1, do not return creation time but 0
+ *
+ * @return
+ */
+ public long getVisibleVersionTimeIgnoreInit() {
+ if (visibleVersion == 1) {
+ return 0L;
+ }
+ return visibleVersionTime;
+ }
+
// The method updateVisibleVersionAndVersionHash is called when fe
restart, the visibleVersionTime is updated
private void setVisibleVersion(long visibleVersion) {
this.visibleVersion = visibleVersion;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index 721bf0ebaef..e61a7a1070f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -147,6 +147,17 @@ public class PartitionInfo implements Writable {
return item;
}
+ public PartitionItem getItemOrAnalysisException(long partitionId) throws
AnalysisException {
+ PartitionItem item = idToItem.get(partitionId);
+ if (item == null) {
+ item = idToTempItem.get(partitionId);
+ }
+ if (item == null) {
+ throw new AnalysisException("PartitionItem not found: " +
partitionId);
+ }
+ return item;
+ }
+
public void setItem(long partitionId, boolean isTemp, PartitionItem item) {
setItemInternal(partitionId, isTemp, item);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 9319c96e040..01feaf23683 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
@@ -37,17 +38,20 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.mtmv.MTMVUtil;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
@@ -68,7 +72,7 @@ public class PartitionsProcDir implements ProcDirInterface {
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime").add("RemoteStoragePolicy")
.add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation")
- .add("IsMutable")
+ .add("IsMutable").add("SyncWithBaseTables").add("UnsyncTables")
.build();
private Database db;
@@ -303,6 +307,20 @@ public class PartitionsProcDir implements ProcDirInterface
{
partitionInfo.add(tblPartitionInfo.getReplicaAllocation(partitionId).toCreateStmt());
partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
+ if (olapTable instanceof MTMV) {
+ try {
+ List<String> partitionUnSyncTables = MTMVUtil
+ .getPartitionUnSyncTables((MTMV) olapTable,
partitionId);
+
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
+ partitionInfo.add(partitionUnSyncTables.toString());
+ } catch (AnalysisException e) {
+ partitionInfo.add(false);
+ partitionInfo.add(e.getMessage());
+ }
+ } else {
+ partitionInfo.add(true);
+ partitionInfo.add(FeConstants.null_string);
+ }
partitionInfos.add(partitionInfo);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 75dc07e915d..32ba1c8306d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -158,6 +158,7 @@ public class PropertyAnalyzer {
public static final String
PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT =
"enable_duplicate_without_keys_by_default";
public static final String PROPERTIES_GRACE_PERIOD = "grace_period";
+ public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES =
"excluded_trigger_tables";
// For unique key data model, the feature Merge-on-Write will leverage a
primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load
stage,
// which can avoid the merging cost in read stage, and accelerate the
aggregation
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 78b02a477f7..8d8e76c3cf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -886,6 +886,9 @@ public class InternalCatalog implements CatalogIf<Database>
{
+ " please use \"DROP table FORCE\".");
}
}
+ if (table.getType() == TableType.MATERIALIZED_VIEW) {
+ Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV)
table);
+ }
unprotectDropTable(db, table, stmt.isForceDrop(), false, 0);
if (!stmt.isForceDrop()) {
recycleTime =
Env.getCurrentRecycleBin().getRecycleTimeById(table.getId());
@@ -898,9 +901,6 @@ public class InternalCatalog implements CatalogIf<Database>
{
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
db.getId(), table.getId());
Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
- if (table.getType() == TableType.MATERIALIZED_VIEW) {
- Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table);
- }
Env.getCurrentEnv().getMtmvService().dropTable(table);
} catch (UserException e) {
throw new DdlException(e.getMessage(), e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index 6321679b6b2..5ee9b43fe1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
+import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
@@ -39,7 +40,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -47,9 +47,8 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-public class MTMVJob extends AbstractJob<MTMVTask, Map> {
+public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
private static final ShowResultSetMetaData JOB_META_DATA =
ShowResultSetMetaData.builder()
@@ -98,6 +97,9 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
@SerializedName(value = "mi")
private long mtmvId;
+ public MTMVJob() {
+ }
+
public MTMVJob(long dbId, long mtmvId) {
this.dbId = dbId;
this.mtmvId = mtmvId;
@@ -110,8 +112,11 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
}
@Override
- public List<MTMVTask> createTasks(TaskType taskType, Map taskContext) {
- MTMVTask task = new MTMVTask(dbId, mtmvId);
+ public List<MTMVTask> createTasks(TaskType taskType, MTMVTaskContext
taskContext) {
+ if (taskContext == null) {
+ taskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
+ }
+ MTMVTask task = new MTMVTask(dbId, mtmvId, taskContext);
task.setTaskType(taskType);
ArrayList<MTMVTask> tasks = new ArrayList<>();
tasks.add(task);
@@ -119,9 +124,25 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
return tasks;
}
+ /**
+ * if user trigger, return true
+ * if system trigger, Check if there are any system triggered tasks, and
if so, return false
+ *
+ * @param taskContext
+ * @return
+ */
@Override
- public boolean isReadyForScheduling(Map taskContext) {
- return CollectionUtils.isEmpty(getRunningTasks());
+ public boolean isReadyForScheduling(MTMVTaskContext taskContext) {
+ if (taskContext != null) {
+ return true;
+ }
+ List<MTMVTask> runningTasks = getRunningTasks();
+ for (MTMVTask task : runningTasks) {
+ if (task.getTaskContext() == null ||
task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) {
+ return false;
+ }
+ }
+ return true;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 804113d9892..8b2d2835127 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -17,24 +17,26 @@
package org.apache.doris.job.extensions.mtmv;
-import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf.TableType;
-import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.mtmv.MTMVCacheManager;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVPlanUtil;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRelation;
-import org.apache.doris.mysql.privilege.Auth;
+import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import
org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
@@ -44,10 +46,18 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
public class MTMVTask extends AbstractTask {
@@ -58,12 +68,17 @@ public class MTMVTask extends AbstractTask {
new Column("TaskId", ScalarType.createStringType()),
new Column("JobId", ScalarType.createStringType()),
new Column("JobName", ScalarType.createStringType()),
+ new Column("MvId", ScalarType.createStringType()),
+ new Column("MvDatabaseId", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
+ new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("StartTime", ScalarType.createStringType()),
new Column("FinishTime", ScalarType.createStringType()),
new Column("DurationMs", ScalarType.createStringType()),
- new Column("ExecuteSql", ScalarType.createStringType()));
+ new Column("TaskContext", ScalarType.createStringType()),
+ new Column("RefreshMode", ScalarType.createStringType()),
+ new Column("RefreshPartitions", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -75,34 +90,66 @@ public class MTMVTask extends AbstractTask {
COLUMN_TO_INDEX = builder.build();
}
+ public enum MTMVTaskTriggerMode {
+ MANUAL,
+ SYSTEM
+ }
+
+ public enum MTMVTaskRefreshMode {
+ COMPLETE,
+ PARTITION,
+ NOT_REFRESH
+ }
+
@SerializedName(value = "di")
private long dbId;
@SerializedName(value = "mi")
private long mtmvId;
- @SerializedName("sql")
- private String sql;
+ @SerializedName("taskContext")
+ private MTMVTaskContext taskContext;
+ @SerializedName("refreshPartitions")
+ List<String> refreshPartitions;
+ @SerializedName("refreshMode")
+ MTMVTaskRefreshMode refreshMode;
private MTMV mtmv;
private MTMVRelation relation;
private StmtExecutor executor;
+ private Set<Long> refreshPartitionIds = Sets.newHashSet();
+
+ public MTMVTask() {
+ }
- public MTMVTask(long dbId, long mtmvId) {
- this.dbId = dbId;
- this.mtmvId = mtmvId;
+ public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) {
+ this.dbId = Objects.requireNonNull(dbId);
+ this.mtmvId = Objects.requireNonNull(mtmvId);
+ this.taskContext = Objects.requireNonNull(taskContext);
}
@Override
public void run() throws JobException {
try {
- ConnectContext ctx = createContext();
+ ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
TUniqueId queryId = generateQueryId();
// Every time a task is run, the relation is regenerated because
baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
- relation = MTMVCacheManager.generateMTMVRelation(mtmv, ctx);
- executor = new StmtExecutor(ctx, sql);
- executor.execute(queryId);
+ relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
+ calculateRefreshInfo();
+ Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
+ if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
+ return;
+ } else if (refreshMode == MTMVTaskRefreshMode.PARTITION) {
+ OlapTable relatedTable = (OlapTable)
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+ tableWithPartKey.put(relatedTable,
mtmv.getMvPartitionInfo().getRelatedCol());
+ }
+ refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv,
refreshPartitionIds);
+ UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
+ .from(mtmv, refreshPartitionIds, tableWithPartKey);
+ executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command,
ctx.getStatementContext()));
+ ctx.setQueryId(queryId);
+ command.run(ctx, executor);
} catch (Throwable e) {
- LOG.warn(e);
+ LOG.warn("run task failed: ", e);
throw new JobException(e);
}
}
@@ -134,9 +181,12 @@ public class MTMVTask extends AbstractTask {
try {
Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
mtmv = (MTMV) db.getTableOrMetaException(mtmvId,
TableType.MATERIALIZED_VIEW);
- sql = generateSql(mtmv);
+ if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+ OlapTable relatedTable = (OlapTable)
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+ MTMVUtil.alignMvPartition(mtmv, relatedTable);
+ }
} catch (UserException e) {
- LOG.warn(e);
+ LOG.warn("before task failed:", e);
throw new JobException(e);
}
}
@@ -147,46 +197,27 @@ public class MTMVTask extends AbstractTask {
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(super.getTaskId())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(super.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(mtmvId)));
+ trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId)));
trow.addToColumnValue(new TCell()
.setStringVal(super.getStatus() == null ?
FeConstants.null_string : super.getStatus().toString()));
+ trow.addToColumnValue(new TCell().setStringVal(super.getErrMsg()));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(super.getStartTimeMs())));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(super.getFinishTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(
(super.getFinishTimeMs() == null || super.getFinishTimeMs() ==
0) ? FeConstants.null_string
: String.valueOf(super.getFinishTimeMs() -
super.getStartTimeMs())));
- trow.addToColumnValue(new TCell().setStringVal(sql));
+ trow.addToColumnValue(new TCell()
+ .setStringVal(taskContext == null ? FeConstants.null_string :
new Gson().toJson(taskContext)));
+ trow.addToColumnValue(
+ new TCell().setStringVal(refreshMode == null ?
FeConstants.null_string : refreshMode.toString()));
+ trow.addToColumnValue(
+ new TCell().setStringVal(
+ refreshPartitions == null ? FeConstants.null_string :
new Gson().toJson(refreshPartitions)));
return trow;
}
- private static String generateSql(MTMV mtmv) {
- StringBuilder builder = new StringBuilder();
- builder.append("INSERT OVERWRITE TABLE ");
- builder.append(mtmv.getDatabase().getCatalog().getName());
- builder.append(".");
-
builder.append(ClusterNamespace.getNameFromFullName(mtmv.getQualifiedDbName()));
- builder.append(".");
- builder.append(mtmv.getName());
- builder.append(" ");
- builder.append(mtmv.getQuerySql());
- return builder.toString();
- }
-
- private ConnectContext createContext() throws AnalysisException {
- ConnectContext ctx = new ConnectContext();
- ctx.setEnv(Env.getCurrentEnv());
- ctx.setQualifiedUser(Auth.ADMIN_USER);
- ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
- ctx.getState().reset();
- ctx.setThreadLocalInfo();
- CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
- .getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId());
- ctx.changeDefaultCatalog(catalog.getName());
-
ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName());
- ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
- return ctx;
- }
-
private TUniqueId generateQueryId() {
UUID taskId = UUID.randomUUID();
return new TUniqueId(taskId.getMostSignificantBits(),
taskId.getLeastSignificantBits());
@@ -198,5 +229,55 @@ public class MTMVTask extends AbstractTask {
mtmv = null;
relation = null;
executor = null;
+ refreshPartitionIds = null;
+ }
+
+ private void calculateRefreshInfo() throws AnalysisException {
+ // check whether the user manually triggers it
+ if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
+ if (taskContext.isComplete()) {
+ this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
+ return;
+ } else if (!CollectionUtils
+ .isEmpty(taskContext.getPartitions())) {
+ this.refreshMode = MTMVTaskRefreshMode.PARTITION;
+ this.refreshPartitionIds =
MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
+ return;
+ }
+ }
+ // check if data is fresh
+ Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables();
+ boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(),
excludedTriggerTables, 0L);
+ if (fresh) {
+ this.refreshMode = MTMVTaskRefreshMode.NOT_REFRESH;
+ return;
+ }
+ // current, if partitionType is SELF_MANAGE, we can only FULL refresh
+ if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.SELF_MANAGE) {
+ this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
+ return;
+ }
+ // if refreshMethod is COMPLETE, we only FULL refresh
+ if (mtmv.getRefreshInfo().getRefreshMethod() ==
RefreshMethod.COMPLETE) {
+ this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
+ return;
+ }
+ OlapTable relatedTable = (OlapTable)
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+ excludedTriggerTables.add(relatedTable.getName());
+ // check if every table except relatedTable is fresh
+ Set<Long> mtmvNeedRefreshPartitions =
MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
+ // if true, we can use `Partition`, otherwise must `FULL`
+ if (mtmvNeedRefreshPartitions.size() != mtmv.getPartitionNum()) {
+ this.refreshMode = MTMVTaskRefreshMode.PARTITION;
+ this.refreshPartitionIds = mtmvNeedRefreshPartitions;
+ return;
+ } else {
+ this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
+ return;
+ }
+ }
+
+ public MTMVTaskContext getTaskContext() {
+ return taskContext;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java
new file mode 100644
index 00000000000..adb6fd5ef71
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.mtmv;
+
+import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.List;
+
+public class MTMVTaskContext {
+
+ @SerializedName(value = "triggerMode")
+ private MTMVTaskTriggerMode triggerMode;
+
+ @SerializedName(value = "partitions")
+ private List<String> partitions;
+
+ @SerializedName(value = "isComplete")
+ private boolean isComplete;
+
+ public MTMVTaskContext(MTMVTaskTriggerMode triggerMode) {
+ this.triggerMode = triggerMode;
+ }
+
+ public MTMVTaskContext(MTMVTaskTriggerMode triggerMode, List<String>
partitions, boolean isComplete) {
+ this.triggerMode = triggerMode;
+ this.partitions = partitions;
+ this.isComplete = isComplete;
+ }
+
+ public List<String> getPartitions() {
+ return partitions;
+ }
+
+ public MTMVTaskTriggerMode getTriggerMode() {
+ return triggerMode;
+ }
+
+ public boolean isComplete() {
+ return isComplete;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 654ee4fbc2f..f887961230f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -120,6 +120,7 @@ public abstract class AbstractTask implements Task {
run();
onSuccess();
} catch (Exception e) {
+ this.errMsg = e.getMessage();
onFail();
log.warn("execute task error, job id is {}, task id is {}", jobId,
taskId, e);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
index 575552cb61e..9b3b6be04f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
@@ -19,13 +19,18 @@ package org.apache.doris.mtmv;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class BaseTableInfo {
+ private static final Logger LOG =
LogManager.getLogger(BaseTableInfo.class);
+
@SerializedName("ti")
private Long tableId;
@SerializedName("di")
@@ -88,4 +93,13 @@ public class BaseTableInfo {
+ ", ctlId=" + ctlId
+ '}';
}
+
+ public String getTableName() {
+ try {
+ return MTMVUtil.getTable(this).getName();
+ } catch (AnalysisException e) {
+ LOG.warn("can not get table: " + this);
+ return "";
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
similarity index 90%
rename from fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java
rename to fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
index b5cf92f87e4..e4aa36d9b61 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
@@ -36,14 +36,14 @@ import java.util.stream.Collectors;
/**
* The cache for materialized view cache
*/
-public class MVCache {
+public class MTMVCache {
// the materialized view plan which should be optimized by the same rules
to query
private final Plan logicalPlan;
// this should be shuttle expression with lineage
private final List<NamedExpression> mvOutputExpressions;
- public MVCache(MTMV materializedView, Plan logicalPlan,
List<NamedExpression> mvOutputExpressions) {
+ public MTMVCache(MTMV materializedView, Plan logicalPlan,
List<NamedExpression> mvOutputExpressions) {
this.logicalPlan = logicalPlan;
this.mvOutputExpressions = mvOutputExpressions;
}
@@ -56,12 +56,12 @@ public class MVCache {
return mvOutputExpressions;
}
- public MVCache(Plan logicalPlan, List<NamedExpression>
mvOutputExpressions) {
+ public MTMVCache(Plan logicalPlan, List<NamedExpression>
mvOutputExpressions) {
this.logicalPlan = logicalPlan;
this.mvOutputExpressions = mvOutputExpressions;
}
- public static MVCache from(MTMV mtmv, ConnectContext connectContext) {
+ public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new
NereidsParser().parseSingle(mtmv.getQuerySql());
// TODO: connect context set current db when create mv by use database
StatementContext mvSqlStatementContext = new
StatementContext(connectContext,
@@ -77,6 +77,6 @@ public class MVCache {
List<NamedExpression> mvOutputExpressions =
mvRewrittenPlan.getExpressions().stream()
.map(NamedExpression.class::cast)
.collect(Collectors.toList());
- return new MVCache(mvPlan, mvOutputExpressions);
+ return new MTMVCache(mvPlan, mvOutputExpressions);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index 416f7f764b6..bdbc3231181 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -33,6 +33,8 @@ import org.apache.doris.job.common.JobType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
+import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
@@ -48,7 +50,7 @@ import java.util.List;
* when do some operation, do something about job
*/
public class MTMVJobManager implements MTMVHookService {
- public static final String MTMV_JOB_PREFIX = "mtmv_";
+ public static final String MTMV_JOB_PREFIX = "inner_mtmv_";
/**
* create MTMVJob
@@ -174,7 +176,10 @@ public class MTMVJobManager implements MTMVHookService {
throw new DdlException("jobs not normal,should have one job,but
job num is: " + jobs.size());
}
try {
-
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), null);
+ MTMVTaskContext mtmvTaskContext = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
+ info.isComplete());
+
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(),
mtmvTaskContext);
+
} catch (JobException e) {
e.printStackTrace();
throw new DdlException(e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
new file mode 100644
index 00000000000..2b862bfab23
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * MTMVPartitionInfo
+ */
+public class MTMVPartitionInfo {
+
+ public enum MTMVPartitionType {
+ FOLLOW_BASE_TABLE,
+ SELF_MANAGE
+ }
+
+ @SerializedName("pt")
+ MTMVPartitionType partitionType;
+ @SerializedName("rt")
+ BaseTableInfo relatedTable;
+ @SerializedName("rc")
+ String relatedCol;
+ @SerializedName("pc")
+ String partitionCol;
+
+ public MTMVPartitionInfo() {
+ }
+
+ public MTMVPartitionInfo(MTMVPartitionType partitionType) {
+ this.partitionType = partitionType;
+ }
+
+ public MTMVPartitionInfo(MTMVPartitionType partitionType,
+ String partitionCol) {
+ this.partitionType = partitionType;
+ this.partitionCol = partitionCol;
+ }
+
+ public MTMVPartitionType getPartitionType() {
+ return partitionType;
+ }
+
+ public void setPartitionType(MTMVPartitionType partitionType) {
+ this.partitionType = partitionType;
+ }
+
+ public BaseTableInfo getRelatedTable() {
+ return relatedTable;
+ }
+
+ public void setRelatedTable(BaseTableInfo relatedTable) {
+ this.relatedTable = relatedTable;
+ }
+
+ public String getRelatedCol() {
+ return relatedCol;
+ }
+
+ public void setRelatedCol(String relatedCol) {
+ this.relatedCol = relatedCol;
+ }
+
+ public String getPartitionCol() {
+ return partitionCol;
+ }
+
+ public void setPartitionCol(String partitionCol) {
+ this.partitionCol = partitionCol;
+ }
+
+ @Override
+ public String toString() {
+ return "MTMVPartitionInfo{"
+ + "partitionType=" + partitionType
+ + ", relatedTable=" + relatedTable
+ + ", relatedCol='" + relatedCol + '\''
+ + ", partitionCol='" + partitionCol + '\''
+ + '}';
+ }
+
+ public String toNameString() {
+ if (partitionType == MTMVPartitionType.SELF_MANAGE) {
+ return "MTMVPartitionInfo{"
+ + "partitionType=" + partitionType
+ + '}';
+ } else {
+ return "MTMVPartitionInfo{"
+ + "partitionType=" + partitionType
+ + ", relatedTable=" + relatedTable.getTableName()
+ + ", relatedCol='" + relatedCol + '\''
+ + ", partitionCol='" + partitionCol + '\''
+ + '}';
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
new file mode 100644
index 00000000000..a8bc43a159e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.mysql.privilege.Auth;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.exceptions.ParseException;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
+import
org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.List;
+import java.util.Set;
+
+public class MTMVPlanUtil {
+
+ public static ConnectContext createMTMVContext(MTMV mtmv) throws
AnalysisException {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setEnv(Env.getCurrentEnv());
+ ctx.setQualifiedUser(Auth.ADMIN_USER);
+ ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
+ ctx.getState().reset();
+ ctx.setThreadLocalInfo();
+ CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
+ .getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId());
+ ctx.changeDefaultCatalog(catalog.getName());
+
ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName());
+ ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
+ return ctx;
+ }
+
+ public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext
ctx) {
+ Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx);
+ return generateMTMVRelation(plan);
+ }
+
+ public static MTMVRelation generateMTMVRelation(Plan plan) {
+ return new MTMVRelation(getBaseTables(plan), getBaseViews(plan));
+ }
+
+ private static Set<BaseTableInfo> getBaseTables(Plan plan) {
+ TableCollectorContext collectorContext =
+ new TableCollector.TableCollectorContext(
+
com.google.common.collect.Sets.newHashSet(TableType.MATERIALIZED_VIEW,
TableType.OLAP));
+ plan.accept(TableCollector.INSTANCE, collectorContext);
+ List<TableIf> collectedTables = collectorContext.getCollectedTables();
+ return transferTableIfToInfo(collectedTables);
+ }
+
+ private static Set<BaseTableInfo> getBaseViews(Plan plan) {
+ TableCollectorContext collectorContext =
+ new TableCollector.TableCollectorContext(
+
com.google.common.collect.Sets.newHashSet(TableType.VIEW));
+ plan.accept(TableCollector.INSTANCE, collectorContext);
+ List<TableIf> collectedTables = collectorContext.getCollectedTables();
+ return transferTableIfToInfo(collectedTables);
+ }
+
+ private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf>
tables) {
+ Set<BaseTableInfo> result =
com.google.common.collect.Sets.newHashSet();
+ for (TableIf table : tables) {
+ result.add(new BaseTableInfo(table));
+ }
+ return result;
+ }
+
+ private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
+ List<StatementBase> statements;
+ try {
+ statements = new NereidsParser().parseSQL(querySql);
+ } catch (Exception e) {
+ throw new ParseException("Nereids parse failed. " +
e.getMessage());
+ }
+ StatementBase parsedStmt = statements.get(0);
+ LogicalPlan logicalPlan = ((LogicalPlanAdapter)
parsedStmt).getLogicalPlan();
+ NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
+ return planner.plan(logicalPlan, PhysicalProperties.ANY,
ExplainLevel.NONE);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
index ed2f0f709f4..0f4f904c573 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
@@ -26,7 +26,8 @@ public class MTMVRefreshEnum {
* RefreshMethod
*/
public enum RefreshMethod {
- COMPLETE //complete
+ COMPLETE, //complete
+ AUTO //try to update incrementally, if not possible, update in full
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
similarity index 50%
rename from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java
rename to
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
index 950fc79ef7d..92149b3b465 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
@@ -17,38 +17,19 @@
package org.apache.doris.mtmv;
-import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
-import org.apache.doris.nereids.NereidsPlanner;
-import org.apache.doris.nereids.exceptions.ParseException;
-import org.apache.doris.nereids.glue.LogicalPlanAdapter;
-import org.apache.doris.nereids.parser.NereidsParser;
-import org.apache.doris.nereids.properties.PhysicalProperties;
-import org.apache.doris.nereids.trees.plans.Plan;
-import
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
-import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
-import
org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.persist.AlterMTMV;
-import org.apache.doris.qe.ConnectContext;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
@@ -63,126 +44,34 @@ import java.util.Set;
/**
* when do some operation, do something about cache
*/
-public class MTMVCacheManager implements MTMVHookService {
- private static final Logger LOG =
LogManager.getLogger(MTMVCacheManager.class);
+public class MTMVRelationManager implements MTMVHookService {
+ private static final Logger LOG =
LogManager.getLogger(MTMVRelationManager.class);
private Map<BaseTableInfo, Set<BaseTableInfo>> tableMTMVs =
Maps.newConcurrentMap();
public Set<BaseTableInfo> getMtmvsByBaseTable(BaseTableInfo table) {
return tableMTMVs.get(table);
}
- // TODO Implement the method which getting materialized view by tables
- public List<MTMV> getAvailableMaterializedView(List<BaseTableInfo> tables)
{
- return ImmutableList.of();
- }
-
- public boolean isAvailableMTMV(MTMV mtmv, ConnectContext ctx) throws
AnalysisException, DdlException {
- // check session variable if enable rewrite
- if (!ctx.getSessionVariable().isEnableMvRewrite()) {
- return false;
- }
- MTMVRelation mtmvRelation = mtmv.getRelation();
- if (mtmvRelation == null) {
- return false;
- }
- // chaek mv is normal
- if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
- && mtmv.getStatus().getRefreshState() ==
MTMVRefreshState.SUCCESS)) {
- return false;
- }
- // check external table
- boolean containsExternalTable =
containsExternalTable(mtmvRelation.getBaseTables());
- if (containsExternalTable) {
- return ctx.getSessionVariable().isEnableExternalMvRewrite();
- }
- // check gracePeriod
- Long gracePeriod = mtmv.getGracePeriod();
- // do not care data is delayed
- if (gracePeriod < 0) {
- return true;
- }
- // compare with base table
- Long mtmvLastTime = getTableLastVisibleVersionTime(mtmv);
- Long maxAvailableTime = mtmvLastTime + gracePeriod;
- for (BaseTableInfo baseTableInfo : mtmvRelation.getBaseTables()) {
- long tableLastVisibleVersionTime =
getTableLastVisibleVersionTime(baseTableInfo);
- if (tableLastVisibleVersionTime > maxAvailableTime) {
- return false;
- }
- }
- return true;
- }
-
- private long getTableLastVisibleVersionTime(BaseTableInfo baseTableInfo)
throws AnalysisException, DdlException {
- Table table = Env.getCurrentEnv().getInternalCatalog()
- .getDbOrAnalysisException(baseTableInfo.getDbId())
- .getTableOrDdlException(baseTableInfo.getTableId(),
TableType.OLAP);
- return getTableLastVisibleVersionTime((OlapTable) table);
- }
-
- private long getTableLastVisibleVersionTime(OlapTable table) {
- long result = 0L;
- long visibleVersionTime;
- for (Partition partition : table.getAllPartitions()) {
- visibleVersionTime = partition.getVisibleVersionTime();
- if (visibleVersionTime > result) {
- result = visibleVersionTime;
- }
- }
- return result;
- }
-
- private boolean containsExternalTable(Set<BaseTableInfo> baseTableInfos) {
- for (BaseTableInfo baseTableInfo : baseTableInfos) {
- if (InternalCatalog.INTERNAL_CATALOG_ID !=
baseTableInfo.getCtlId()) {
- return true;
+ public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos) {
+ Set<MTMV> res = Sets.newHashSet();
+ Set<BaseTableInfo> mvInfos = getAvailableMTMVInfos(tableInfos);
+ for (BaseTableInfo tableInfo : mvInfos) {
+ try {
+ res.add((MTMV) MTMVUtil.getTable(tableInfo));
+ } catch (AnalysisException e) {
+ // not throw exception to client, just ignore it
+ LOG.warn("getTable failed: {}", tableInfo.toString(), e);
}
}
- return false;
- }
-
- public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext
ctx) {
- Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx);
- return new MTMVRelation(getBaseTables(plan), getBaseViews(plan));
- }
-
- private static Set<BaseTableInfo> getBaseTables(Plan plan) {
- TableCollectorContext collectorContext =
- new TableCollector.TableCollectorContext(
- Sets.newHashSet(TableType.MATERIALIZED_VIEW,
TableType.OLAP));
- plan.accept(TableCollector.INSTANCE, collectorContext);
- List<TableIf> collectedTables = collectorContext.getCollectedTables();
- return transferTableIfToInfo(collectedTables);
- }
-
- private static Set<BaseTableInfo> getBaseViews(Plan plan) {
- TableCollectorContext collectorContext =
- new TableCollector.TableCollectorContext(
- Sets.newHashSet(TableType.VIEW));
- plan.accept(TableCollector.INSTANCE, collectorContext);
- List<TableIf> collectedTables = collectorContext.getCollectedTables();
- return transferTableIfToInfo(collectedTables);
- }
-
- private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf>
tables) {
- Set<BaseTableInfo> result = Sets.newHashSet();
- for (TableIf table : tables) {
- result.add(new BaseTableInfo(table));
- }
- return result;
+ return res;
}
- private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
- List<StatementBase> statements;
- try {
- statements = new NereidsParser().parseSQL(querySql);
- } catch (Exception e) {
- throw new ParseException("Nereids parse failed. " +
e.getMessage());
+ public Set<BaseTableInfo> getAvailableMTMVInfos(List<BaseTableInfo>
tableInfos) {
+ Set<BaseTableInfo> mvInfos = Sets.newHashSet();
+ for (BaseTableInfo tableInfo : tableInfos) {
+ mvInfos.addAll(getMtmvsByBaseTable(tableInfo));
}
- StatementBase parsedStmt = statements.get(0);
- LogicalPlan logicalPlan = ((LogicalPlanAdapter)
parsedStmt).getLogicalPlan();
- NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
- return planner.plan(logicalPlan, PhysicalProperties.ANY,
ExplainLevel.NONE);
+ return mvInfos;
}
private Set<BaseTableInfo> getOrCreateMTMVs(BaseTableInfo baseTableInfo) {
@@ -233,6 +122,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* modify `tableMTMVs` by MTMVRelation
+ *
* @param mtmv
* @param dbId
*/
@@ -243,6 +133,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* remove cache of mtmv
+ *
* @param mtmv
*/
@Override
@@ -262,6 +153,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* modify `tableMTMVs` by MTMVRelation
+ *
* @param mtmv
* @param relation
* @param task
@@ -276,6 +168,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* update mtmv status to `SCHEMA_CHANGE`
+ *
* @param table
*/
@Override
@@ -285,6 +178,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* update mtmv status to `SCHEMA_CHANGE`
+ *
* @param table
*/
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
index e83ca4e9496..9530467dee7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
@@ -18,10 +18,13 @@
package org.apache.doris.mtmv;
import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.persist.AlterMTMV;
@@ -36,16 +39,16 @@ public class MTMVService {
private static final Logger LOG = LogManager.getLogger(MTMVService.class);
private Map<String, MTMVHookService> hooks = Maps.newConcurrentMap();
- private MTMVCacheManager cacheManager = new MTMVCacheManager();
+ private MTMVRelationManager relationManager = new MTMVRelationManager();
private MTMVJobManager jobManager = new MTMVJobManager();
public MTMVService() {
registerHook("MTMVJobManager", jobManager);
- registerHook("MTMVCacheManager", cacheManager);
+ registerHook("MTMVRelationManager", relationManager);
}
- public MTMVCacheManager getCacheManager() {
- return cacheManager;
+ public MTMVRelationManager getRelationManager() {
+ return relationManager;
}
public void registerHook(String name, MTMVHookService mtmvHookService) {
@@ -76,8 +79,12 @@ public class MTMVService {
}
}
- public void createMTMV(MTMV mtmv) throws DdlException {
+ public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException {
Objects.requireNonNull(mtmv);
+ if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+ OlapTable relatedTable = (OlapTable)
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+ MTMVUtil.alignMvPartition(mtmv, relatedTable);
+ }
LOG.info("createMTMV: " + mtmv.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.createMTMV(mtmv);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
new file mode 100644
index 00000000000..81da2946ff1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
@@ -0,0 +1,489 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.AddPartitionClause;
+import org.apache.doris.analysis.DropPartitionClause;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.analysis.SinglePartitionDesc;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+public class MTMVUtil {
+ private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);
+
+ /**
+ * get Table by BaseTableInfo
+ *
+ * @param baseTableInfo
+ * @return
+ * @throws AnalysisException
+ */
+ public static TableIf getTable(BaseTableInfo baseTableInfo) throws
AnalysisException {
+ TableIf table = Env.getCurrentEnv().getCatalogMgr()
+ .getCatalogOrAnalysisException(baseTableInfo.getCtlId())
+ .getDbOrAnalysisException(baseTableInfo.getDbId())
+ .getTableOrAnalysisException(baseTableInfo.getTableId());
+ return table;
+ }
+
+ /**
+ * Determine whether the mtmv is sync with tables
+ *
+ * @param mtmv
+ * @param tables
+ * @param excludedTriggerTables
+ * @param gracePeriod
+ * @return
+ */
+ public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
+ Set<String> excludedTriggerTables, Long gracePeriod) {
+ return isSync(getTableMinVisibleVersionTime(mtmv), tables,
excludedTriggerTables, gracePeriod);
+ }
+
+ /**
+ * Determine whether the partition is sync with retated partition and
other baseTables
+ *
+ * @param mtmv
+ * @param partitionId
+ * @param tables
+ * @param excludedTriggerTables
+ * @param gracePeriod
+ * @return
+ * @throws AnalysisException
+ */
+ public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<BaseTableInfo> tables,
+ Set<String> excludedTriggerTables, Long gracePeriod) throws
AnalysisException {
+ boolean isSyncWithPartition = true;
+ if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+ OlapTable relatedTable = (OlapTable)
getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+ // if follow base table, not need compare with related table, only
should compare with related partition
+ excludedTriggerTables.add(relatedTable.getName());
+ PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
+ long relatedPartitionId = getExistPartitionId(item,
+ relatedTable.getPartitionInfo().getIdToItem(false));
+ if (relatedPartitionId == -1L) {
+ LOG.warn("can not found related partition: " + partitionId);
+ return false;
+ }
+ isSyncWithPartition = isSyncWithPartition(mtmv, partitionId,
relatedTable, relatedPartitionId);
+ }
+ return isSyncWithPartition && isSync(
+
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(),
tables,
+ excludedTriggerTables, gracePeriod);
+
+ }
+
+ /**
+ * Align the partitions of mtmv and related tables, delete more and add
less
+ *
+ * @param mtmv
+ * @param relatedTable
+ * @throws DdlException
+ * @throws AnalysisException
+ */
+ public static void alignMvPartition(MTMV mtmv, OlapTable relatedTable)
+ throws DdlException, AnalysisException {
+ Map<Long, PartitionItem> relatedTableItems =
relatedTable.getPartitionInfo().getIdToItem(false);
+ Map<Long, PartitionItem> mtmvItems =
mtmv.getPartitionInfo().getIdToItem(false);
+ // drop partition of mtmv
+ for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
+ long partitionId = getExistPartitionId(entry.getValue(),
relatedTableItems);
+ if (partitionId == -1L) {
+ dropPartition(mtmv, entry.getKey());
+ }
+ }
+ // add partition for mtmv
+ for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
+ long partitionId = getExistPartitionId(entry.getValue(),
mtmvItems);
+ if (partitionId == -1L) {
+ addPartition(mtmv, relatedTable, entry.getKey());
+ }
+ }
+ }
+
+ /**
+ * get mv.partitions which not sync with relatedTable
+ * <p>
+ * Comparing the time of mtmv and relatedTable partitioning,
+ * if the visibleVersionTime of the base table is later,
+ * then the partitioning of this mtmv is considered stale
+ *
+ * @param mtmv
+ * @param relatedTable
+ * @return partitionIds
+ * @throws DdlException when partition can not found
+ */
+ public static Set<Long> getMTMVStalePartitions(MTMV mtmv, OlapTable
relatedTable)
+ throws AnalysisException {
+ Set<Long> ids = Sets.newHashSet();
+ Map<Long, Set<Long>> mvToBasePartitions = getMvToBasePartitions(mtmv,
relatedTable);
+ for (Entry<Long, Set<Long>> entry : mvToBasePartitions.entrySet()) {
+ for (Long relatedPartitionId : entry.getValue()) {
+ boolean syncWithRelatedPartition = isSyncWithPartition(mtmv,
entry.getKey(), relatedTable,
+ relatedPartitionId);
+ if (!syncWithRelatedPartition) {
+ ids.add(entry.getKey());
+ break;
+ }
+ }
+ }
+ return ids;
+ }
+
+ public static List<String> getPartitionNamesByIds(MTMV mtmv, Set<Long>
ids) throws AnalysisException {
+ List<String> res = Lists.newArrayList();
+ for (Long partitionId : ids) {
+
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
+ }
+ return res;
+ }
+
+ public static Set<Long> getPartitionsIdsByNames(MTMV mtmv, List<String>
partitions) throws AnalysisException {
+ Set<Long> res = Sets.newHashSet();
+ for (String partitionName : partitions) {
+ Partition partition =
mtmv.getPartitionOrAnalysisException(partitionName);
+ res.add(partition.getId());
+ }
+ return res;
+ }
+
+ /**
+ * check if table is sync with all baseTables
+ *
+ * @param mtmv
+ * @return
+ */
+ public static boolean isMTMVSync(MTMV mtmv) {
+ MTMVRelation mtmvRelation = mtmv.getRelation();
+ if (mtmvRelation == null) {
+ return false;
+ }
+ return isMTMVSync(mtmv, mtmv.getRelation().getBaseTables(),
Sets.newHashSet(), 0L);
+ }
+
+ /**
+ * get not sync tables
+ *
+ * @param mtmv
+ * @param partitionId
+ * @return
+ * @throws AnalysisException
+ */
+ public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long
partitionId) throws AnalysisException {
+ List<String> res = Lists.newArrayList();
+ long maxAvailableTime =
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
+ for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables())
{
+ TableIf table = getTable(baseTableInfo);
+ if (!(table instanceof OlapTable)) {
+ continue;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
+
.getMvPartitionInfo().getRelatedTable().equals(baseTableInfo)) {
+ PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
+ long relatedPartitionId = getExistPartitionId(item,
+ olapTable.getPartitionInfo().getIdToItem(false));
+ if (relatedPartitionId == -1L) {
+ throw new AnalysisException("can not found related
partition");
+ }
+ boolean isSyncWithPartition = isSyncWithPartition(mtmv,
partitionId, olapTable, relatedPartitionId);
+ if (!isSyncWithPartition) {
+ res.add(olapTable.getName());
+ }
+ } else {
+ long tableLastVisibleVersionTime =
getTableMaxVisibleVersionTime((OlapTable) table);
+ if (tableLastVisibleVersionTime > maxAvailableTime) {
+ res.add(table.getName());
+ }
+ }
+ }
+ return res;
+ }
+
+ /**
+ * Determine which partition of mtmv can be rewritten
+ *
+ * @param mtmv
+ * @param ctx
+ * @return
+ */
+ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv,
ConnectContext ctx) {
+ List<Partition> res = Lists.newArrayList();
+ Collection<Partition> allPartitions = mtmv.getPartitions();
+ // check session variable if enable rewrite
+ if (!ctx.getSessionVariable().isEnableMvRewrite()) {
+ return res;
+ }
+ MTMVRelation mtmvRelation = mtmv.getRelation();
+ if (mtmvRelation == null) {
+ return res;
+ }
+ // check mv is normal
+ if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
+ && mtmv.getStatus().getRefreshState() ==
MTMVRefreshState.SUCCESS)) {
+ return res;
+ }
+ // check gracePeriod
+ Long gracePeriod = mtmv.getGracePeriod();
+ // do not care data is delayed
+ if (gracePeriod < 0) {
+ return allPartitions;
+ }
+
+ for (Partition partition : allPartitions) {
+ try {
+ if (isMTMVPartitionSync(mtmv, partition.getId(),
mtmvRelation.getBaseTables(), Sets.newHashSet(),
+ gracePeriod)) {
+ res.add(partition);
+ }
+ } catch (AnalysisException e) {
+ // ignore it
+ LOG.warn("check isMTMVPartitionSync failed", e);
+ }
+ }
+ return res;
+ }
+
+ public static Set<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
+ Collection<Partition> allPartitions = mtmv.getPartitions();
+ Set<Long> res = Sets.newHashSet();
+ for (Partition partition : allPartitions) {
+ try {
+ if (!isMTMVPartitionSync(mtmv, partition.getId(),
mtmv.getRelation().getBaseTables(),
+ mtmv.getExcludedTriggerTables(),
+ 0L)) {
+ res.add(partition.getId());
+ }
+ } catch (AnalysisException e) {
+ res.add(partition.getId());
+ LOG.warn("check isMTMVPartitionSync failed", e);
+ }
+ }
+ return res;
+ }
+
+ /**
+ * compare last update time of mtmvPartition and tablePartition
+ *
+ * @param mtmv
+ * @param mtmvPartitionId
+ * @param relatedTable
+ * @param relatedTablePartitionId
+ * @return
+ * @throws AnalysisException
+ */
+ private static boolean isSyncWithPartition(MTMV mtmv, Long
mtmvPartitionId, OlapTable relatedTable,
+ Long relatedTablePartitionId) throws AnalysisException {
+ return
mtmv.getPartitionOrAnalysisException(mtmvPartitionId).getVisibleVersionTimeIgnoreInit()
>= relatedTable
+
.getPartitionOrAnalysisException(relatedTablePartitionId).getVisibleVersionTimeIgnoreInit();
+ }
+
+ /**
+ * like p_00000101_20170201
+ *
+ * @param desc
+ * @return
+ */
+ private static String generatePartitionName(PartitionKeyDesc desc) {
+ String partitionName = "p_";
+ partitionName +=
desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
+ .replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
+ if (partitionName.length() > 50) {
+ partitionName = partitionName.substring(0, 30) +
Math.abs(Objects.hash(partitionName))
+ + "_" + System.currentTimeMillis();
+ }
+ return partitionName;
+ }
+
+ /**
+ * drop partition of mtmv
+ *
+ * @param mtmv
+ * @param partitionId
+ */
+ private static void dropPartition(MTMV mtmv, Long partitionId) throws
AnalysisException, DdlException {
+ Partition partition =
mtmv.getPartitionOrAnalysisException(partitionId);
+ DropPartitionClause dropPartitionClause = new
DropPartitionClause(false, partition.getName(), false, false);
+ Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv,
dropPartitionClause);
+ }
+
+ /**
+ * add partition for mtmv like relatedPartitionId of relatedTable
+ *
+ * @param mtmv
+ * @param relatedTable
+ * @param relatedPartitionId
+ * @throws AnalysisException
+ * @throws DdlException
+ */
+ private static void addPartition(MTMV mtmv, OlapTable relatedTable, Long
relatedPartitionId)
+ throws AnalysisException, DdlException {
+ PartitionDesc partitionDesc =
relatedTable.getPartitionInfo().toPartitionDesc(relatedTable);
+ Partition partition =
relatedTable.getPartitionOrAnalysisException(relatedPartitionId);
+ SinglePartitionDesc oldPartitionDesc =
partitionDesc.getSinglePartitionDescByName(partition.getName());
+
+ Map<String, String> partitionProperties = Maps.newHashMap();
+ SinglePartitionDesc singleRangePartitionDesc = new
SinglePartitionDesc(true,
+ generatePartitionName(oldPartitionDesc.getPartitionKeyDesc()),
+ oldPartitionDesc.getPartitionKeyDesc(), partitionProperties);
+
+ AddPartitionClause addPartitionClause = new
AddPartitionClause(singleRangePartitionDesc,
+ mtmv.getDefaultDistributionInfo().toDistributionDesc(),
partitionProperties, false);
+ Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(),
mtmv.getName(), addPartitionClause);
+ }
+
+ /**
+ * compare PartitionItem and return equals partitionId
+ * if not found, return -1L
+ *
+ * @param target
+ * @param sources
+ * @return
+ */
+ private static long getExistPartitionId(PartitionItem target, Map<Long,
PartitionItem> sources) {
+ for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
+ if (target.equals(entry.getValue())) {
+ return entry.getKey();
+ }
+ }
+ return -1L;
+ }
+
+ /**
+ * Get the maximum update time among all partitions
+ *
+ * @param table
+ * @return
+ */
+ private static long getTableMaxVisibleVersionTime(OlapTable table) {
+ long result = 0L;
+ long visibleVersionTime;
+ for (Partition partition : table.getAllPartitions()) {
+ visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
+ if (visibleVersionTime > result) {
+ result = visibleVersionTime;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get the minimum update time among all partitions
+ *
+ * @param table
+ * @return
+ */
+ private static long getTableMinVisibleVersionTime(OlapTable table) {
+ long result = Long.MAX_VALUE;
+ long visibleVersionTime;
+ for (Partition partition : table.getAllPartitions()) {
+ visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
+ if (visibleVersionTime < result) {
+ result = visibleVersionTime;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Obtain the partition correspondence between materialized views and base
tables
+ * Currently, there is a one-to-one correspondence between the partitions
of materialized views and base tables,
+ * but for scalability reasons, Set is used
+ * <p>
+ * before use this method,should call `alignMvPartition`
+ *
+ * @param mtmv
+ * @param relatedTable
+ * @return mv.partitionId ==> relatedTable.partitionId
+ */
+ private static Map<Long, Set<Long>> getMvToBasePartitions(MTMV mtmv,
OlapTable relatedTable)
+ throws AnalysisException {
+ HashMap<Long, Set<Long>> res = Maps.newHashMap();
+ Map<Long, PartitionItem> relatedTableItems =
relatedTable.getPartitionInfo().getIdToItem(false);
+ Map<Long, PartitionItem> mtmvItems =
mtmv.getPartitionInfo().getIdToItem(false);
+ for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
+ long partitionId = getExistPartitionId(entry.getValue(),
relatedTableItems);
+ if (partitionId == -1L) {
+ throw new AnalysisException("partition not found: " +
entry.getValue().toString());
+ }
+ res.put(entry.getKey(), Sets.newHashSet(partitionId));
+ }
+ return res;
+ }
+
+ /**
+ * Determine is sync, ignoring excludedTriggerTables and non OlapTanle
+ *
+ * @param visibleVersionTime
+ * @param tables
+ * @param excludedTriggerTables
+ * @param gracePeriod
+ * @return
+ */
+ private static boolean isSync(long visibleVersionTime, Set<BaseTableInfo>
tables,
+ Set<String> excludedTriggerTables, Long gracePeriod) {
+ long maxAvailableTime = visibleVersionTime + gracePeriod;
+ for (BaseTableInfo baseTableInfo : tables) {
+ TableIf table = null;
+ try {
+ table = getTable(baseTableInfo);
+ } catch (AnalysisException e) {
+ e.printStackTrace();
+ return false;
+ }
+ if (excludedTriggerTables.contains(table.getName())) {
+ continue;
+ }
+ if (!(table instanceof OlapTable)) {
+ continue;
+ }
+ long tableLastVisibleVersionTime =
getTableMaxVisibleVersionTime((OlapTable) table);
+ if (tableLastVisibleVersionTime > maxAvailableTime) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 1c4d847e3d1..d0e731b6237 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -30,6 +30,8 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
@@ -89,6 +91,7 @@ import
org.apache.doris.nereids.DorisParser.GroupingSetContext;
import org.apache.doris.nereids.DorisParser.HavingClauseContext;
import org.apache.doris.nereids.DorisParser.HintAssignmentContext;
import org.apache.doris.nereids.DorisParser.HintStatementContext;
+import org.apache.doris.nereids.DorisParser.IdentifierContext;
import org.apache.doris.nereids.DorisParser.IdentifierListContext;
import org.apache.doris.nereids.DorisParser.IdentifierOrTextContext;
import org.apache.doris.nereids.DorisParser.IdentifierSeqContext;
@@ -112,7 +115,6 @@ import
org.apache.doris.nereids.DorisParser.LogicalNotContext;
import org.apache.doris.nereids.DorisParser.MapLiteralContext;
import org.apache.doris.nereids.DorisParser.MultiStatementsContext;
import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext;
-import org.apache.doris.nereids.DorisParser.MvRefreshUnitContext;
import org.apache.doris.nereids.DorisParser.NamedExpressionContext;
import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext;
import org.apache.doris.nereids.DorisParser.NullLiteralContext;
@@ -550,10 +552,25 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
comment,
desc, properties, logicalPlan, querySql,
new MTMVRefreshInfo(buildMode, refreshMethod,
refreshTriggerInfo),
- ctx.cols == null ? Lists.newArrayList() :
visitSimpleColumnDefs(ctx.cols)
+ ctx.cols == null ? Lists.newArrayList() :
visitSimpleColumnDefs(ctx.cols),
+ visitMTMVPartitionInfo(ctx.partitionKey)
));
}
+ /**
+ * get MTMVPartitionInfo
+ *
+ * @param ctx IdentifierContext
+ * @return MTMVPartitionInfo
+ */
+ public MTMVPartitionInfo visitMTMVPartitionInfo(IdentifierContext ctx) {
+ if (ctx == null) {
+ return new MTMVPartitionInfo(MTMVPartitionType.SELF_MANAGE);
+ } else {
+ return new MTMVPartitionInfo(MTMVPartitionType.FOLLOW_BASE_TABLE,
ctx.getText());
+ }
+ }
+
@Override
public List<SimpleColumnDefinition>
visitSimpleColumnDefs(SimpleColumnDefsContext ctx) {
if (ctx == null) {
@@ -601,19 +618,32 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
int interval = Integer.parseInt(ctx.INTEGER_VALUE().getText());
String startTime = ctx.STARTS() == null ? null
: ctx.STRING_LITERAL().getText().substring(1,
ctx.STRING_LITERAL().getText().length() - 1);
- IntervalUnit unit = visitMvRefreshUnit(ctx.mvRefreshUnit());
+ IntervalUnit unit = visitMvRefreshUnit(ctx.refreshUnit);
return new MTMVRefreshSchedule(startTime, interval, unit);
}
- @Override
- public IntervalUnit visitMvRefreshUnit(MvRefreshUnitContext ctx) {
- return IntervalUnit.valueOf(ctx.getText().toUpperCase());
+ /**
+ * get IntervalUnit,only enable_job_schedule_second_for_test is true, can
use second
+ *
+ * @param ctx ctx
+ * @return IntervalUnit
+ */
+ public IntervalUnit visitMvRefreshUnit(IdentifierContext ctx) {
+ IntervalUnit intervalUnit =
IntervalUnit.fromString(ctx.getText().toUpperCase());
+ if (null == intervalUnit) {
+ throw new AnalysisException("interval time unit can not be " +
ctx.getText());
+ }
+ if (intervalUnit.equals(IntervalUnit.SECOND)
+ && !Config.enable_job_schedule_second_for_test) {
+ throw new AnalysisException("interval time unit can not be
second");
+ }
+ return intervalUnit;
}
@Override
public RefreshMethod visitRefreshMethod(RefreshMethodContext ctx) {
if (ctx == null) {
- return RefreshMethod.COMPLETE;
+ return RefreshMethod.AUTO;
}
return RefreshMethod.valueOf(ctx.getText().toUpperCase());
}
@@ -631,7 +661,19 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
@Override
public RefreshMTMVCommand visitRefreshMTMV(RefreshMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
- return new RefreshMTMVCommand(new RefreshMTMVInfo(new
TableNameInfo(nameParts)));
+ List<String> partitions = ImmutableList.of();
+ if (ctx.partitionSpec() != null) {
+ if (ctx.partitionSpec().TEMPORARY() != null) {
+ throw new AnalysisException("Not allowed to specify TEMPORARY
");
+ }
+ if (ctx.partitionSpec().partition != null) {
+ partitions =
ImmutableList.of(ctx.partitionSpec().partition.getText());
+ } else {
+ partitions =
visitIdentifierList(ctx.partitionSpec().partitions);
+ }
+ }
+ return new RefreshMTMVCommand(new RefreshMTMVInfo(new
TableNameInfo(nameParts),
+ partitions, ctx.COMPLETE() != null));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 3139d98f906..483e8e517a0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -82,7 +82,7 @@ public abstract class AbstractMaterializedViewRule {
queryPlan.getGroupExpression().get().getOwnerGroup().getGroupId())) {
continue;
}
- Plan mvPlan =
materializationContext.getMtmv().getMvCache().getLogicalPlan();
+ Plan mvPlan =
materializationContext.getMtmv().getCache().getLogicalPlan();
List<StructInfo> viewStructInfos = extractStructInfo(mvPlan,
cascadesContext);
if (viewStructInfos.size() > 1) {
// view struct info should only have one
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
index 8061b1835e6..b6c7234d5bf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
@@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.mtmv.BaseTableInfo;
-import org.apache.doris.mtmv.MTMVCacheManager;
+import org.apache.doris.mtmv.MTMVRelationManager;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.PlannerHook;
@@ -70,10 +70,10 @@ public class InitMaterializationContextHook implements
PlannerHook {
}
List<BaseTableInfo> baseTableUsed =
collectedTables.stream().map(BaseTableInfo::new).collect(Collectors.toList());
- // TODO the logic should be move to MTMVCacheManager later when
getAvailableMaterializedView is ready in
+ // TODO the logic should be move to MTMVRelationManager later when
getAvailableMaterializedView is ready in
// MV Cache manager
Env env = cascadesContext.getConnectContext().getEnv();
- MTMVCacheManager cacheManager = env.getMtmvService().getCacheManager();
+ MTMVRelationManager cacheManager =
env.getMtmvService().getRelationManager();
Set<BaseTableInfo> materializedViews = new HashSet<>();
for (BaseTableInfo baseTableInfo : baseTableUsed) {
Set<BaseTableInfo> mtmvsByBaseTable =
cacheManager.getMtmvsByBaseTable(baseTableInfo);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
index 3e1fe99c9c8..4f0b63d2ee7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
@@ -19,7 +19,7 @@ package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Table;
-import org.apache.doris.mtmv.MVCache;
+import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupId;
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
@@ -59,17 +59,17 @@ public class MaterializationContext {
this.mvScanPlan = mvScanPlan;
this.baseTables = baseTables;
this.baseViews = baseViews;
- MVCache mvCache = mtmv.getMvCache();
+ MTMVCache mtmvCache = mtmv.getCache();
// TODO This logic should move to materialized view cache manager
- if (mvCache == null) {
- mvCache = MVCache.from(mtmv, cascadesContext.getConnectContext());
- mtmv.setMvCache(mvCache);
+ if (mtmvCache == null) {
+ mtmvCache = mtmvCache.from(mtmv,
cascadesContext.getConnectContext());
+ mtmv.setCache(mtmvCache);
}
// mv output expression shuttle, this will be used to expression
rewrite
this.mvExprToMvScanExprMapping = ExpressionMapping.generate(
ExpressionUtils.shuttleExpressionWithLineage(
- mvCache.getMvOutputExpressions(),
- mvCache.getLogicalPlan()),
+ mtmvCache.getMvOutputExpressions(),
+ mtmvCache.getLogicalPlan()),
mvScanPlan.getExpressions());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java
index 9aadf0f1cf0..ab9d6e35c9d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java
@@ -31,7 +31,7 @@ import java.util.Objects;
/**
* alter multi table materialized view
*/
-public class AlterMTMVCommand extends Command implements ForwardWithSync {
+public class AlterMTMVCommand extends Command implements ForwardWithSync,
NotAllowFallback {
public static final Logger LOG =
LogManager.getLogger(AlterMTMVCommand.class);
private final AlterMTMVInfo alterMTMVInfo;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java
index c246991e618..814804b8ce9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java
@@ -32,7 +32,7 @@ import java.util.Objects;
/**
* create multi table materialized view
*/
-public class CreateMTMVCommand extends Command implements ForwardWithSync {
+public class CreateMTMVCommand extends Command implements ForwardWithSync,
NotAllowFallback {
public static final Logger LOG =
LogManager.getLogger(CreateMTMVCommand.class);
private final CreateMTMVInfo createMTMVInfo;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java
index a0d614b163c..19eedb82746 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java
@@ -29,7 +29,7 @@ import java.util.Objects;
/**
* refresh mtmv
*/
-public class DropMTMVCommand extends Command implements ForwardWithSync {
+public class DropMTMVCommand extends Command implements ForwardWithSync,
NotAllowFallback {
private final DropMTMVInfo dropMTMVInfo;
public DropMTMVCommand(DropMTMVInfo dropMTMVInfo) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java
similarity index 54%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java
index ed2f0f709f4..72d8c82e599 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java
@@ -15,51 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.mtmv;
+package org.apache.doris.nereids.trees.plans.commands;
/**
- * refresh enum
+ * The class that implements this interface does not allow fallback to
OriginalPlanner,
+ * for example, some new features are not implemented by the old parser
*/
-public class MTMVRefreshEnum {
-
- /**
- * RefreshMethod
- */
- public enum RefreshMethod {
- COMPLETE //complete
- }
-
- /**
- * BuildMode
- */
- public enum BuildMode {
- IMMEDIATE, //right now
- DEFERRED // deferred
- }
-
- /**
- * RefreshTrigger
- */
- public enum RefreshTrigger {
- MANUAL, //manual
- SCHEDULE // schedule
- }
-
- /**
- * MTMVState
- */
- public enum MTMVState {
- INIT,
- NORMAL,
- SCHEMA_CHANGE
- }
-
- /**
- * MTMVRefreshState
- */
- public enum MTMVRefreshState {
- INIT,
- FAIL,
- SUCCESS
- }
+public interface NotAllowFallback {
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java
index 982e8d86257..a918112555e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java
@@ -29,7 +29,7 @@ import java.util.Objects;
/**
* refresh mtmv
*/
-public class RefreshMTMVCommand extends Command implements ForwardWithSync {
+public class RefreshMTMVCommand extends Command implements ForwardWithSync,
NotAllowFallback {
private final RefreshMTMVInfo refreshMTMVInfo;
public RefreshMTMVCommand(RefreshMTMVInfo refreshMTMVInfo) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index b221e507b82..ae4e048f1c6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -17,12 +17,15 @@
package org.apache.doris.nereids.trees.plans.commands;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
+import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.parser.NereidsParser;
@@ -33,11 +36,13 @@ import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -62,38 +67,41 @@ public class UpdateMvByPartitionCommand extends
InsertOverwriteTableCommand {
* Construct command
*
* @param mv materialize view
- * @param partitions update partitions in mv and tables
+ * @param partitionIds update partitions in mv and tables
* @param tableWithPartKey the partitions key for different table
* @return command
*/
- public static UpdateMvByPartitionCommand from(MTMV mv, Set<PartitionItem>
partitions,
+ public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long>
partitionIds,
Map<OlapTable, String> tableWithPartKey) {
NereidsParser parser = new NereidsParser();
Map<OlapTable, Set<Expression>> predicates =
- constructTableWithPredicates(partitions, tableWithPartKey);
- List<String> parts = constructPartsForMv(mv, partitions);
+ constructTableWithPredicates(mv, partitionIds,
tableWithPartKey);
+ List<String> parts = constructPartsForMv(mv, partitionIds);
Plan plan = parser.parseSingle(mv.getQuerySql());
plan = plan.accept(new PredicateAdder(), predicates);
+ if (plan instanceof Sink) {
+ plan = plan.child(0);
+ }
UnboundTableSink<? extends Plan> sink =
new UnboundTableSink<>(mv.getFullQualifiers(),
ImmutableList.of(), ImmutableList.of(),
parts, plan);
return new UpdateMvByPartitionCommand(sink);
}
- private static List<String> constructPartsForMv(MTMV mv,
Set<PartitionItem> partitions) {
- return mv.getPartitionNames().stream()
- .filter(name -> {
- PartitionItem mvPartItem =
mv.getPartitionInfo().getItem(mv.getPartition(name).getId());
- return partitions.stream().anyMatch(p ->
p.getIntersect(mvPartItem) != null);
- })
+ private static List<String> constructPartsForMv(MTMV mv, Set<Long>
partitionIds) {
+ return partitionIds.stream()
+ .map(id -> mv.getPartition(id).getName())
.collect(ImmutableList.toImmutableList());
}
- private static Map<OlapTable, Set<Expression>>
constructTableWithPredicates(Set<PartitionItem> partitions,
- Map<OlapTable, String> tableWithPartKey) {
+ private static Map<OlapTable, Set<Expression>>
constructTableWithPredicates(MTMV mv,
+ Set<Long> partitionIds, Map<OlapTable, String> tableWithPartKey) {
+ Set<PartitionItem> items = partitionIds.stream()
+ .map(id -> mv.getPartitionInfo().getItem(id))
+ .collect(ImmutableSet.toImmutableSet());
ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new
ImmutableMap.Builder<>();
tableWithPartKey.forEach((table, colName) ->
- builder.put(table, constructPredicates(partitions, colName))
+ builder.put(table, constructPredicates(items, colName))
);
return builder.build();
}
@@ -131,8 +139,15 @@ public class UpdateMvByPartitionCommand extends
InsertOverwriteTableCommand {
static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable,
Set<Expression>>> {
@Override
- public Plan visitLogicalOlapScan(LogicalOlapScan scan, Map<OlapTable,
Set<Expression>> predicates) {
- return new LogicalFilter<>(predicates.get(scan.getTable()), scan);
+ public Plan visitUnboundRelation(UnboundRelation unboundRelation,
Map<OlapTable, Set<Expression>> predicates) {
+ List<String> tableQualifier =
RelationUtil.getQualifierName(ConnectContext.get(),
+ unboundRelation.getNameParts());
+ TableIf table = RelationUtil.getTable(tableQualifier,
Env.getCurrentEnv());
+ if (table instanceof OlapTable && predicates.containsKey(table)) {
+ return new
LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
+ unboundRelation);
+ }
+ return unboundRelation;
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
index 4ea2a3b6982..66bc1b22107 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
@@ -19,28 +19,41 @@ package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.analysis.CreateMTMVStmt;
import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.ListPartitionDesc;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRelation;
+import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
+import
org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import
org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import
org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
@@ -56,6 +69,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
@@ -78,6 +92,9 @@ public class CreateMTMVInfo {
private final List<ColumnDefinition> columns = Lists.newArrayList();
private final List<SimpleColumnDefinition> simpleColumnDefinitions;
private final EnvInfo envInfo;
+ private final MTMVPartitionInfo mvPartitionInfo;
+ private PartitionDesc partitionDesc;
+ private MTMVRelation relation;
/**
* constructor for create MTMV
@@ -87,7 +104,8 @@ public class CreateMTMVInfo {
DistributionDescriptor distribution, Map<String, String>
properties,
LogicalPlan logicalQuery, String querySql,
MTMVRefreshInfo refreshInfo,
- List<SimpleColumnDefinition> simpleColumnDefinitions) {
+ List<SimpleColumnDefinition> simpleColumnDefinitions,
+ MTMVPartitionInfo mvPartitionInfo) {
this.ifNotExists = Objects.requireNonNull(ifNotExists, "require
ifNotExists object");
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
this.keys = Utils.copyRequiredList(keys);
@@ -101,6 +119,8 @@ public class CreateMTMVInfo {
.requireNonNull(simpleColumnDefinitions, "require
simpleColumnDefinitions object");
this.envInfo = new
EnvInfo(ConnectContext.get().getCurrentCatalog().getId(),
ConnectContext.get().getCurrentDbId());
+ this.mvPartitionInfo = Objects
+ .requireNonNull(mvPartitionInfo, "require mtmvPartitionInfo
object");
}
/**
@@ -154,6 +174,11 @@ public class CreateMTMVInfo {
mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD,
gracePeriod);
properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
}
+ if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
+ String excludedTriggerTables =
properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
+
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES,
excludedTriggerTables);
+
properties.remove(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
+ }
}
/**
@@ -163,22 +188,93 @@ public class CreateMTMVInfo {
// create table as select
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY,
ExplainLevel.ALL_PLAN);
+ if (plan.anyMatch(node -> node instanceof OneRowRelation)) {
+ throw new AnalysisException("at least contain one table");
+ }
+ // can not contain VIEW or MTMV
analyzeBaseTables(plan);
- analyzeExpressions((PhysicalPlan) plan);
+ // can not contain Random function
+ analyzeExpressions(planner.getAnalyzedPlan());
+ // can not contain partition or tablets
+ boolean containTableQueryOperator =
MaterializedViewUtils.containTableQueryOperator(planner.getAnalyzedPlan());
+ if (containTableQueryOperator) {
+ throw new AnalysisException("can not contain invalid expression");
+ }
+ getRelation(planner);
getColumns(plan);
+ analyzePartition(planner);
+ }
+
+ private void getRelation(NereidsPlanner planner) {
+ Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY,
ExplainLevel.NONE);
+ this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
+ }
+
+ private void analyzePartition(NereidsPlanner planner) {
+ if (mvPartitionInfo.getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+ Plan mvRewrittenPlan =
+ planner.plan(logicalQuery, PhysicalProperties.ANY,
ExplainLevel.REWRITTEN_PLAN);
+ Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils
+ .getRelatedTableInfo(mvPartitionInfo.getPartitionCol(),
mvRewrittenPlan);
+ if (!relatedTableInfo.isPresent() ||
!relatedTableInfo.get().isPctPossible()) {
+ throw new AnalysisException("Unable to find a suitable base
table for partitioning");
+ }
+ TableIf followTable = null;
+ try {
+ followTable =
MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
+ } catch (org.apache.doris.common.AnalysisException e) {
+ throw new AnalysisException(e.getMessage(), e);
+ }
+ if (!(followTable instanceof OlapTable)) {
+ throw new AnalysisException("base table for partitioning only
can be OlapTable.");
+ }
+ Set<String> partitionColumnNames;
+ try {
+ partitionColumnNames = ((OlapTable)
followTable).getPartitionColumnNames();
+ } catch (DdlException e) {
+ throw new AnalysisException(e.getMessage(), e);
+ }
+
+ if
(!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
+ throw new AnalysisException("error related column: " +
relatedTableInfo.get().getColumn());
+ }
+ if (partitionColumnNames.size() != 1) {
+ throw new AnalysisException("base table for partitioning only
support single column.");
+ }
+
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
+ mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
+ partitionDesc = generatePartitionDesc((OlapTable) followTable);
+ }
+ }
+
+ private PartitionDesc generatePartitionDesc(OlapTable relatedTable) {
+ PartitionType type = relatedTable.getPartitionInfo().getType();
+ try {
+ if (type == PartitionType.RANGE) {
+ return new
RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
+ Lists.newArrayList());
+ } else if (type == PartitionType.LIST) {
+ return new
ListPartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
+ Lists.newArrayList());
+ } else {
+ return null;
+ }
+ } catch (org.apache.doris.common.AnalysisException e) {
+ throw new AnalysisException("can not generate partitionDesc", e);
+ }
}
private void analyzeBaseTables(Plan plan) {
TableCollectorContext collectorContext =
- new
TableCollector.TableCollectorContext(Sets.newHashSet(TableType.MATERIALIZED_VIEW));
+ new
TableCollector.TableCollectorContext(Sets.newHashSet(TableType.MATERIALIZED_VIEW,
TableType.VIEW));
plan.accept(TableCollector.INSTANCE, collectorContext);
List<TableIf> collectedTables = collectorContext.getCollectedTables();
if (!CollectionUtils.isEmpty(collectedTables)) {
- throw new AnalysisException("can not contain MATERIALIZED_VIEW");
+ throw new AnalysisException("can not contain MATERIALIZED_VIEW or
VIEW");
}
}
- private void analyzeExpressions(PhysicalPlan plan) {
+ private void analyzeExpressions(Plan plan) {
List<TreeNode<Expression>> functionCollectResult = new ArrayList<>();
plan.accept(NondeterministicFunctionCollector.INSTANCE,
functionCollectResult);
if (!CollectionUtils.isEmpty(functionCollectResult)) {
@@ -225,7 +321,8 @@ public class CreateMTMVInfo {
.map(ColumnDefinition::translateToCatalogStyle)
.collect(Collectors.toList());
return new CreateMTMVStmt(ifNotExists, tableName, catalogColumns,
refreshInfo, keysDesc,
- distribution.translateToCatalogStyle(), properties,
mvProperties, querySql, comment, envInfo);
+ distribution.translateToCatalogStyle(), properties,
mvProperties, querySql, comment, envInfo,
+ partitionDesc, mvPartitionInfo, relation);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
index 422b9697c7c..e6a4368a850 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
@@ -17,12 +17,22 @@
package org.apache.doris.nereids.trees.plans.commands.info;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
import java.util.Objects;
/**
@@ -30,13 +40,18 @@ import java.util.Objects;
*/
public class RefreshMTMVInfo {
private final TableNameInfo mvName;
+ private List<String> partitions;
+ private boolean isComplete;
- public RefreshMTMVInfo(TableNameInfo mvName) {
+ public RefreshMTMVInfo(TableNameInfo mvName, List<String> partitions,
boolean isComplete) {
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
+ this.partitions = Utils.copyRequiredList(partitions);
+ this.isComplete = Objects.requireNonNull(isComplete, "require
isComplete object");
}
/**
* analyze refresh info
+ *
* @param ctx ConnectContext
*/
public void analyze(ConnectContext ctx) {
@@ -48,13 +63,41 @@ public class RefreshMTMVInfo {
mvName.getDb() + ": " + mvName.getTbl());
throw new AnalysisException(message);
}
+ try {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
+ MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(),
TableType.MATERIALIZED_VIEW);
+ if (!CollectionUtils.isEmpty(partitions)) {
+ MTMVUtil.getPartitionsIdsByNames(mtmv, partitions);
+ }
+ } catch (org.apache.doris.common.AnalysisException |
MetaNotFoundException | DdlException e) {
+ throw new AnalysisException(e.getMessage());
+ }
}
/**
* getMvName
+ *
* @return TableNameInfo
*/
public TableNameInfo getMvName() {
return mvName;
}
+
+ /**
+ * getPartitions
+ *
+ * @return partitionNames
+ */
+ public List<String> getPartitions() {
+ return partitions;
+ }
+
+ /**
+ * isComplete
+ *
+ * @return isComplete
+ */
+ public boolean isComplete() {
+ return isComplete;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 92d1d47e5e5..ff837283056 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -127,6 +127,7 @@ import
org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Forward;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapScanNode;
@@ -439,6 +440,14 @@ public class StmtExecutor {
execute(queryId);
}
+ public boolean notAllowFallback() {
+ if (parsedStmt instanceof LogicalPlanAdapter) {
+ LogicalPlan logicalPlan = ((LogicalPlanAdapter)
parsedStmt).getLogicalPlan();
+ return logicalPlan instanceof NotAllowFallback;
+ }
+ return false;
+ }
+
public void execute(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
@@ -457,6 +466,10 @@ public class StmtExecutor {
// try to fall back to legacy planner
LOG.debug("nereids cannot process statement\n" +
originStmt.originStmt
+ "\n because of " + e.getMessage(), e);
+ if (notAllowFallback()) {
+ LOG.warn("Analyze failed. {}",
context.getQueryIdentifier(), e);
+ throw ((NereidsException) e).getException();
+ }
boolean isInsertIntoCommand = parsedStmt != null &&
parsedStmt instanceof LogicalPlanAdapter
&& ((LogicalPlanAdapter)
parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
if (e instanceof NereidsException
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index f2a7bccdb00..e8620b105b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.qe.ConnectContext;
@@ -537,6 +538,8 @@ public class MetadataGenerator {
trow.addToColumnValue(new
TCell().setStringVal(mv.getQuerySql()));
trow.addToColumnValue(new
TCell().setStringVal(mv.getEnvInfo().toString()));
trow.addToColumnValue(new
TCell().setStringVal(mv.getMvProperties().toString()));
+ trow.addToColumnValue(new
TCell().setStringVal(mv.getMvPartitionInfo().toNameString()));
+ trow.addToColumnValue(new
TCell().setBoolVal(MTMVUtil.isMTMVSync(mv)));
dataBatch.add(trow);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
index af0efcc1ce5..44e3ed58400 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
@@ -54,7 +54,9 @@ public class MvInfosTableValuedFunction extends
MetadataTableValuedFunction {
new Column("RefreshInfo", ScalarType.createStringType()),
new Column("QuerySql", ScalarType.createStringType()),
new Column("EnvInfo", ScalarType.createStringType()),
- new Column("MvProperties", ScalarType.createStringType()));
+ new Column("MvProperties", ScalarType.createStringType()),
+ new Column("MvPartitionInfo", ScalarType.createStringType()),
+ new Column("SyncWithBaseTables",
ScalarType.createType(PrimitiveType.BOOLEAN)));
private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]