This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 77270a04797 [enhance](mtmv)Improve the performance of obtaining
partition/table v… (#39478)
77270a04797 is described below
commit 77270a0479752f7595649ed4da583cf74bd0c98a
Author: zhangdong <[email protected]>
AuthorDate: Thu Aug 22 00:07:52 2024 +0800
[enhance](mtmv)Improve the performance of obtaining partition/table v…
(#39478)
…ersions (#39301)
pick: https://github.com/apache/doris/pull/39301
---
.../java/org/apache/doris/catalog/OlapTable.java | 13 +-
.../java/org/apache/doris/catalog/Partition.java | 13 ++
.../doris/datasource/hive/HMSExternalTable.java | 6 +-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 18 ++-
.../org/apache/doris/mtmv/MTMVBaseVersions.java | 38 ++++++
.../org/apache/doris/mtmv/MTMVPartitionUtil.java | 142 +++++++++++++++------
.../org/apache/doris/mtmv/MTMVRefreshContext.java | 54 ++++++++
.../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 4 +-
.../org/apache/doris/mtmv/MTMVRewriteUtil.java | 12 +-
.../apache/doris/mtmv/MTMVPartitionUtilTest.java | 27 +++-
.../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 10 +-
.../java/org/apache/doris/mtmv/MTMVTaskTest.java | 24 ++--
12 files changed, 272 insertions(+), 89 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index db5c756772e..9bbb76f0b6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -47,6 +47,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
@@ -2836,14 +2837,18 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
}
@Override
- public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws
AnalysisException {
- long visibleVersion =
getPartitionOrAnalysisException(partitionName).getVisibleVersion();
+ public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context)
+ throws AnalysisException {
+ Map<String, Long> partitionVersions =
context.getBaseVersions().getPartitionVersions();
+ long visibleVersion = partitionVersions.containsKey(partitionName) ?
partitionVersions.get(partitionName)
+ :
getPartitionOrAnalysisException(partitionName).getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}
@Override
- public MTMVSnapshotIf getTableSnapshot() {
- long visibleVersion = getVisibleVersion();
+ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) {
+ Map<Long, Long> tableVersions =
context.getBaseVersions().getTableVersions();
+ long visibleVersion = tableVersions.containsKey(id) ?
tableVersions.get(id) : getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 4a829c204ad..2d0e8079cb1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -20,9 +20,11 @@ package org.apache.doris.catalog;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
+import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.rpc.RpcException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -37,6 +39,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* Internal representation of partition-related metadata.
@@ -168,6 +171,16 @@ public class Partition extends MetaObject implements
Writable {
return visibleVersionTime;
}
+ public static List<Long> getVisibleVersions(List<? extends Partition>
partitions) throws RpcException {
+ if (Config.isCloudMode()) {
+ // Throwing RPC exceptions is to ensure compatibility with the
caller's code
+ // and avoid different implementations in different versions
+ throw new RpcException("127.0.0.1", "not implement cloud in
current version");
+ } else {
+ return
partitions.stream().map(Partition::getVisibleVersion).collect(Collectors.toList());
+ }
+ }
+
/**
* if visibleVersion is 1, do not return creation time but 0
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 2a1129e5ee5..6bb2be58743 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
@@ -749,13 +750,14 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
@Override
- public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws
AnalysisException {
+ public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context)
+ throws AnalysisException {
long partitionLastModifyTime =
getPartitionLastModifyTime(partitionName);
return new MTMVTimestampSnapshot(partitionLastModifyTime);
}
@Override
- public MTMVSnapshotIf getTableSnapshot() throws AnalysisException {
+ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws
AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index ae49759b54c..1fa42236c61 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -37,6 +37,7 @@ import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
+import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
@@ -177,8 +178,8 @@ public class MTMVTask extends AbstractTask {
if (mtmv.getMvPartitionInfo().getPartitionType() !=
MTMVPartitionType.SELF_MANAGE) {
MTMVPartitionUtil.alignMvPartition(mtmv);
}
- Map<String, Set<String>> partitionMappings =
mtmv.calculatePartitionMappings();
- this.needRefreshPartitions =
calculateNeedRefreshPartitions(partitionMappings);
+ MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv);
+ this.needRefreshPartitions =
calculateNeedRefreshPartitions(context);
this.refreshMode = generateRefreshMode(needRefreshPartitions);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
@@ -196,8 +197,7 @@ public class MTMVTask extends AbstractTask {
.subList(start, end > needRefreshPartitions.size() ?
needRefreshPartitions.size() : end));
// need get names before exec
Map<String, MTMVRefreshPartitionSnapshot>
execPartitionSnapshots = MTMVPartitionUtil
- .generatePartitionSnapshots(mtmv,
relation.getBaseTablesOneLevel(), execPartitionNames,
- partitionMappings);
+ .generatePartitionSnapshots(context,
relation.getBaseTablesOneLevel(), execPartitionNames);
exec(ctx, execPartitionNames, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
partitionSnapshots.putAll(execPartitionSnapshots);
@@ -432,7 +432,7 @@ public class MTMVTask extends AbstractTask {
}
}
- public List<String> calculateNeedRefreshPartitions(Map<String,
Set<String>> partitionMappings)
+ public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext
context)
throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
@@ -450,9 +450,8 @@ public class MTMVTask extends AbstractTask {
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the
tableId
- boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv,
relation.getBaseTablesOneLevel(),
- mtmv.getExcludedTriggerTables(),
- partitionMappings);
+ boolean fresh = MTMVPartitionUtil.isMTMVSync(context,
relation.getBaseTablesOneLevel(),
+ mtmv.getExcludedTriggerTables());
if (fresh) {
return Lists.newArrayList();
}
@@ -462,8 +461,7 @@ public class MTMVTask extends AbstractTask {
}
// We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the
tableId
- return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv,
relation.getBaseTablesOneLevel(),
- partitionMappings);
+ return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context,
relation.getBaseTablesOneLevel());
}
public MTMVTaskContext getTaskContext() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java
new file mode 100644
index 00000000000..7f83389a953
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import java.util.Map;
+
+public class MTMVBaseVersions {
+ private final Map<Long, Long> tableVersions;
+ private final Map<String, Long> partitionVersions;
+
+ public MTMVBaseVersions(Map<Long, Long> tableVersions, Map<String, Long>
partitionVersions) {
+ this.tableVersions = tableVersions;
+ this.partitionVersions = partitionVersions;
+ }
+
+ public Map<Long, Long> getTableVersions() {
+ return tableVersions;
+ }
+
+ public Map<String, Long> getPartitionVersions() {
+ return partitionVersions;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index 5625c695b47..54f8374d9a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -26,13 +26,16 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.rpc.RpcException;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -71,17 +74,18 @@ public class MTMVPartitionUtil {
/**
* Determine whether the partition is sync with retated partition and
other baseTables
*
- * @param mtmv
+ * @param refreshContext
* @param partitionName
- * @param relatedPartitionNames
* @param tables
* @param excludedTriggerTables
* @return
* @throws AnalysisException
*/
- public static boolean isMTMVPartitionSync(MTMV mtmv, String partitionName,
Set<String> relatedPartitionNames,
+ public static boolean isMTMVPartitionSync(MTMVRefreshContext
refreshContext, String partitionName,
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
+ MTMV mtmv = refreshContext.getMtmv();
+ Set<String> relatedPartitionNames =
refreshContext.getPartitionMappings().get(partitionName);
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() !=
MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
@@ -92,9 +96,10 @@ public class MTMVPartitionUtil {
partitionName, mtmv.getName(), relatedTable.getName());
return false;
}
- isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName,
relatedTable, relatedPartitionNames);
+ isSyncWithPartition = isSyncWithPartitions(refreshContext,
partitionName, relatedPartitionNames);
}
- return isSyncWithPartition && isSyncWithAllBaseTables(mtmv,
partitionName, tables, excludedTriggerTables);
+ return isSyncWithPartition && isSyncWithAllBaseTables(refreshContext,
partitionName, tables,
+ excludedTriggerTables);
}
@@ -192,8 +197,8 @@ public class MTMVPartitionUtil {
return false;
}
try {
- return isMTMVSync(mtmv, mtmvRelation.getBaseTablesOneLevel(),
Sets.newHashSet(),
- mtmv.calculatePartitionMappings());
+ return isMTMVSync(MTMVRefreshContext.buildContext(mtmv),
mtmvRelation.getBaseTablesOneLevel(),
+ Sets.newHashSet());
} catch (AnalysisException e) {
LOG.warn("isMTMVSync failed: ", e);
return false;
@@ -203,19 +208,18 @@ public class MTMVPartitionUtil {
/**
* Determine whether the mtmv is sync with tables
*
- * @param mtmv
+ * @param context
* @param tables
* @param excludeTables
- * @param partitionMappings
* @return
* @throws AnalysisException
*/
- public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
Set<String> excludeTables,
- Map<String, Set<String>> partitionMappings)
+ public static boolean isMTMVSync(MTMVRefreshContext context,
Set<BaseTableInfo> tables, Set<String> excludeTables)
throws AnalysisException {
+ MTMV mtmv = context.getMtmv();
Set<String> partitionNames = mtmv.getPartitionNames();
for (String partitionName : partitionNames) {
- if (!isMTMVPartitionSync(mtmv, partitionName,
partitionMappings.get(partitionName), tables,
+ if (!isMTMVPartitionSync(context, partitionName, tables,
excludeTables)) {
return false;
}
@@ -234,17 +238,18 @@ public class MTMVPartitionUtil {
public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv,
List<Long> partitionIds)
throws AnalysisException {
Map<Long, List<String>> res = Maps.newHashMap();
- Map<String, Set<String>> partitionMappings =
mtmv.calculatePartitionMappings();
+ MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv);
for (Long partitionId : partitionIds) {
String partitionName =
mtmv.getPartitionOrAnalysisException(partitionId).getName();
- res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionName,
partitionMappings.get(partitionName)));
+ res.put(partitionId, getPartitionUnSyncTables(context,
partitionName));
}
return res;
}
- private static List<String> getPartitionUnSyncTables(MTMV mtmv, String
partitionName,
- Set<String> relatedPartitionNames)
+ private static List<String> getPartitionUnSyncTables(MTMVRefreshContext
context, String partitionName)
throws AnalysisException {
+ MTMV mtmv = context.getMtmv();
+ Set<String> relatedPartitionNames =
context.getPartitionMappings().get(partitionName);
List<String> res = Lists.newArrayList();
for (BaseTableInfo baseTableInfo :
mtmv.getRelation().getBaseTablesOneLevel()) {
TableIf table = MTMVUtil.getTable(baseTableInfo);
@@ -262,13 +267,13 @@ public class MTMVPartitionUtil {
res.add(mtmvRelatedTableIf.getName());
continue;
}
- boolean isSyncWithPartition = isSyncWithPartitions(mtmv,
partitionName, mtmvRelatedTableIf,
+ boolean isSyncWithPartition = isSyncWithPartitions(context,
partitionName,
relatedPartitionNames);
if (!isSyncWithPartition) {
res.add(mtmvRelatedTableIf.getName());
}
} else {
- if (!isSyncWithBaseTable(mtmv, partitionName, baseTableInfo)) {
+ if (!isSyncWithBaseTable(context, partitionName,
baseTableInfo)) {
res.add(table.getName());
}
}
@@ -279,17 +284,17 @@ public class MTMVPartitionUtil {
/**
* Get the partitions that need to be refreshed
*
- * @param mtmv
+ * @param context
* @param baseTables
* @return
*/
- public static List<String> getMTMVNeedRefreshPartitions(MTMV mtmv,
Set<BaseTableInfo> baseTables,
- Map<String, Set<String>> partitionMappings) {
+ public static List<String> getMTMVNeedRefreshPartitions(MTMVRefreshContext
context, Set<BaseTableInfo> baseTables) {
+ MTMV mtmv = context.getMtmv();
Set<String> partitionNames = mtmv.getPartitionNames();
List<String> res = Lists.newArrayList();
for (String partitionName : partitionNames) {
try {
- if (!isMTMVPartitionSync(mtmv, partitionName,
partitionMappings.get(partitionName), baseTables,
+ if (!isMTMVPartitionSync(context, partitionName, baseTables,
mtmv.getExcludedTriggerTables())) {
res.add(partitionName);
}
@@ -304,16 +309,16 @@ public class MTMVPartitionUtil {
/**
* Compare the current and last updated partition (or table) snapshot of
the associated partition (or table)
*
- * @param mtmv
+ * @param context
* @param mtmvPartitionName
- * @param relatedTable
* @param relatedPartitionNames
* @return
* @throws AnalysisException
*/
- public static boolean isSyncWithPartitions(MTMV mtmv, String
mtmvPartitionName,
- MTMVRelatedTableIf relatedTable,
+ public static boolean isSyncWithPartitions(MTMVRefreshContext context,
String mtmvPartitionName,
Set<String> relatedPartitionNames) throws AnalysisException {
+ MTMV mtmv = context.getMtmv();
+ MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
if (!relatedTable.needAutoRefresh()) {
return true;
}
@@ -324,7 +329,7 @@ public class MTMVPartitionUtil {
}
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
- .getPartitionSnapshot(relatedPartitionName);
+ .getPartitionSnapshot(relatedPartitionName, context);
if (!mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName,
relatedPartitionName,
relatedPartitionCurrentSnapshot)) {
@@ -397,7 +402,8 @@ public class MTMVPartitionUtil {
* @param excludedTriggerTables
* @return
*/
- private static boolean isSyncWithAllBaseTables(MTMV mtmv, String
mtmvPartitionName, Set<BaseTableInfo> tables,
+ private static boolean isSyncWithAllBaseTables(MTMVRefreshContext context,
String mtmvPartitionName,
+ Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
for (BaseTableInfo baseTableInfo : tables) {
TableIf table = null;
@@ -410,7 +416,7 @@ public class MTMVPartitionUtil {
if (excludedTriggerTables.contains(table.getName())) {
continue;
}
- boolean syncWithBaseTable = isSyncWithBaseTable(mtmv,
mtmvPartitionName, baseTableInfo);
+ boolean syncWithBaseTable = isSyncWithBaseTable(context,
mtmvPartitionName, baseTableInfo);
if (!syncWithBaseTable) {
return false;
}
@@ -418,8 +424,10 @@ public class MTMVPartitionUtil {
return true;
}
- private static boolean isSyncWithBaseTable(MTMV mtmv, String
mtmvPartitionName, BaseTableInfo baseTableInfo)
+ private static boolean isSyncWithBaseTable(MTMVRefreshContext context,
String mtmvPartitionName,
+ BaseTableInfo baseTableInfo)
throws AnalysisException {
+ MTMV mtmv = context.getMtmv();
TableIf table = null;
try {
table = MTMVUtil.getTable(baseTableInfo);
@@ -437,7 +445,7 @@ public class MTMVPartitionUtil {
if (!baseTable.needAutoRefresh()) {
return true;
}
- MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
+ MTMVSnapshotIf baseTableCurrentSnapshot =
baseTable.getTableSnapshot(context);
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(),
baseTableCurrentSnapshot);
}
@@ -445,35 +453,35 @@ public class MTMVPartitionUtil {
/**
* Generate updated snapshots of partitions to determine if they are
synchronized
*
- * @param mtmv
+ * @param context
* @param baseTables
* @param partitionNames
- * @param partitionMappings
* @return
* @throws AnalysisException
*/
- public static Map<String, MTMVRefreshPartitionSnapshot>
generatePartitionSnapshots(MTMV mtmv,
- Set<BaseTableInfo> baseTables, Set<String> partitionNames,
- Map<String, Set<String>> partitionMappings)
+ public static Map<String, MTMVRefreshPartitionSnapshot>
generatePartitionSnapshots(MTMVRefreshContext context,
+ Set<BaseTableInfo> baseTables, Set<String> partitionNames)
throws AnalysisException {
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
for (String partitionName : partitionNames) {
res.put(partitionName,
- generatePartitionSnapshot(mtmv, baseTables,
partitionMappings.get(partitionName)));
+ generatePartitionSnapshot(context, baseTables,
+
context.getPartitionMappings().get(partitionName)));
}
return res;
}
- private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV
mtmv,
+ private static MTMVRefreshPartitionSnapshot
generatePartitionSnapshot(MTMVRefreshContext context,
Set<BaseTableInfo> baseTables, Set<String> relatedPartitionNames)
throws AnalysisException {
+ MTMV mtmv = context.getMtmv();
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new
MTMVRefreshPartitionSnapshot();
if (mtmv.getMvPartitionInfo().getPartitionType() !=
MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf partitionSnapshot = relatedTable
- .getPartitionSnapshot(relatedPartitionName);
+ .getPartitionSnapshot(relatedPartitionName, context);
refreshPartitionSnapshot.getPartitions()
.put(relatedPartitionName, partitionSnapshot);
}
@@ -487,7 +495,8 @@ public class MTMVPartitionUtil {
if (!(table instanceof MTMVRelatedTableIf)) {
continue;
}
- refreshPartitionSnapshot.getTables().put(table.getId(),
((MTMVRelatedTableIf) table).getTableSnapshot());
+ refreshPartitionSnapshot.getTables()
+ .put(table.getId(), ((MTMVRelatedTableIf)
table).getTableSnapshot(context));
}
return refreshPartitionSnapshot;
}
@@ -501,4 +510,57 @@ public class MTMVPartitionUtil {
}
throw new AnalysisException("can not getPartitionColumnType by:" +
col);
}
+
+ public static MTMVBaseVersions getBaseVersions(MTMV mtmv) throws
AnalysisException {
+ return new MTMVBaseVersions(getTableVersions(mtmv),
getPartitionVersions(mtmv));
+ }
+
+ private static Map<String, Long> getPartitionVersions(MTMV mtmv) throws
AnalysisException {
+ Map<String, Long> res = Maps.newHashMap();
+ if
(mtmv.getMvPartitionInfo().getPartitionType().equals(MTMVPartitionType.SELF_MANAGE))
{
+ return res;
+ }
+ MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
+ if (!(relatedTable instanceof OlapTable)) {
+ return res;
+ }
+ List<Partition> partitions = Lists.newArrayList(((OlapTable)
relatedTable).getPartitions());
+ List<Long> versions = null;
+ try {
+ versions = Partition.getVisibleVersions(partitions);
+ } catch (RpcException e) {
+ throw new AnalysisException("getVisibleVersions failed.", e);
+ }
+ Preconditions.checkState(partitions.size() == versions.size());
+ for (int i = 0; i < partitions.size(); i++) {
+ res.put(partitions.get(i).getName(), versions.get(i));
+ }
+ return res;
+ }
+
+ private static Map<Long, Long> getTableVersions(MTMV mtmv) {
+ Map<Long, Long> res = Maps.newHashMap();
+ if (mtmv.getRelation() == null ||
mtmv.getRelation().getBaseTablesOneLevel() == null) {
+ return res;
+ }
+ List<OlapTable> olapTables = Lists.newArrayList();
+ for (BaseTableInfo baseTableInfo :
mtmv.getRelation().getBaseTablesOneLevel()) {
+ TableIf table = null;
+ try {
+ table = MTMVUtil.getTable(baseTableInfo);
+ } catch (AnalysisException e) {
+ LOG.info(e);
+ continue;
+ }
+ if (table instanceof OlapTable) {
+ olapTables.add((OlapTable) table);
+ }
+ }
+ List<Long> versions = OlapTable.getVisibleVersionInBatch(olapTables);
+ Preconditions.checkState(olapTables.size() == versions.size());
+ for (int i = 0; i < olapTables.size(); i++) {
+ res.put(olapTables.get(i).getId(), versions.get(i));
+ }
+ return res;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java
new file mode 100644
index 00000000000..3d611b5e852
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.common.AnalysisException;
+
+import java.util.Map;
+import java.util.Set;
+
+public class MTMVRefreshContext {
+ private MTMV mtmv;
+ private Map<String, Set<String>> partitionMappings;
+ private MTMVBaseVersions baseVersions;
+
+ public MTMVRefreshContext(MTMV mtmv) {
+ this.mtmv = mtmv;
+ }
+
+ public MTMV getMtmv() {
+ return mtmv;
+ }
+
+ public Map<String, Set<String>> getPartitionMappings() {
+ return partitionMappings;
+ }
+
+ public MTMVBaseVersions getBaseVersions() {
+ return baseVersions;
+ }
+
+ public static MTMVRefreshContext buildContext(MTMV mtmv) throws
AnalysisException {
+ MTMVRefreshContext context = new MTMVRefreshContext(mtmv);
+ context.partitionMappings = mtmv.calculatePartitionMappings();
+ context.baseVersions = MTMVPartitionUtil.getBaseVersions(mtmv);
+ return context;
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
index c34df750de5..516eb904e58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
@@ -69,7 +69,7 @@ public interface MTMVRelatedTableIf extends TableIf {
* @return partition snapshot at current time
* @throws AnalysisException
*/
- MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws
AnalysisException;
+ MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context) throws AnalysisException;
/**
* getTableSnapshot
@@ -77,7 +77,7 @@ public interface MTMVRelatedTableIf extends TableIf {
* @return table snapshot at current time
* @throws AnalysisException
*/
- MTMVSnapshotIf getTableSnapshot() throws AnalysisException;
+ MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws
AnalysisException;
/**
* Does the current type of table allow timed triggering
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
index 5392313ba62..5e17673a068 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
@@ -31,8 +31,6 @@ import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
public class MTMVRewriteUtil {
private static final Logger LOG =
LogManager.getLogger(MTMVRewriteUtil.class);
@@ -57,7 +55,7 @@ public class MTMVRewriteUtil {
|| mtmv.getStatus().getRefreshState() ==
MTMVRefreshState.INIT) {
return res;
}
- Map<String, Set<String>> partitionMappings = null;
+ MTMVRefreshContext refreshContext = null;
// check gracePeriod
long gracePeriodMills = mtmv.getGracePeriod();
for (Partition partition : allPartitions) {
@@ -67,11 +65,11 @@ public class MTMVRewriteUtil {
continue;
}
try {
- if (partitionMappings == null) {
- partitionMappings = mtmv.calculatePartitionMappings();
+ if (refreshContext == null) {
+ refreshContext = MTMVRefreshContext.buildContext(mtmv);
}
- if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv,
partition.getName(),
- partitionMappings.get(partition.getName()),
mtmvRelation.getBaseTablesOneLevel(),
+ if (MTMVPartitionUtil.isMTMVPartitionSync(refreshContext,
partition.getName(),
+ mtmvRelation.getBaseTablesOneLevel(),
Sets.newHashSet())) {
res.add(partition);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
index d6c4a87f224..63a75c72498 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Mocked;
@@ -55,6 +56,10 @@ public class MTMVPartitionUtilTest {
private MTMVRefreshSnapshot refreshSnapshot;
@Mocked
private MTMVUtil mtmvUtil;
+ @Mocked
+ private MTMVRefreshContext context;
+ @Mocked
+ private MTMVBaseVersions versions;
private Set<BaseTableInfo> baseTables = Sets.newHashSet();
@@ -67,6 +72,18 @@ public class MTMVPartitionUtilTest {
minTimes = 0;
result = relation;
+ context.getMtmv();
+ minTimes = 0;
+ result = mtmv;
+
+ context.getPartitionMappings();
+ minTimes = 0;
+ result = Maps.newHashMap();
+
+ context.getBaseVersions();
+ minTimes = 0;
+ result = versions;
+
mtmv.getPartitions();
minTimes = 0;
result = Lists.newArrayList(p1);
@@ -95,7 +112,7 @@ public class MTMVPartitionUtilTest {
minTimes = 0;
result = true;
- baseOlapTable.getTableSnapshot();
+ baseOlapTable.getTableSnapshot((MTMVRefreshContext) any);
minTimes = 0;
result = baseSnapshotIf;
@@ -115,7 +132,7 @@ public class MTMVPartitionUtilTest {
minTimes = 0;
result = true;
- baseOlapTable.getPartitionSnapshot(anyString);
+ baseOlapTable.getPartitionSnapshot(anyString,
(MTMVRefreshContext) any);
minTimes = 0;
result = baseSnapshotIf;
@@ -152,7 +169,7 @@ public class MTMVPartitionUtilTest {
@Test
public void testIsSyncWithPartition() throws AnalysisException {
boolean isSyncWithPartition = MTMVPartitionUtil
- .isSyncWithPartitions(mtmv, "name1", baseOlapTable,
Sets.newHashSet("name2"));
+ .isSyncWithPartitions(context, "name1",
Sets.newHashSet("name2"));
Assert.assertTrue(isSyncWithPartition);
}
@@ -166,7 +183,7 @@ public class MTMVPartitionUtilTest {
}
};
boolean isSyncWithPartition = MTMVPartitionUtil
- .isSyncWithPartitions(mtmv, "name1", baseOlapTable,
Sets.newHashSet("name2"));
+ .isSyncWithPartitions(context, "name1",
Sets.newHashSet("name2"));
Assert.assertFalse(isSyncWithPartition);
}
@@ -180,7 +197,7 @@ public class MTMVPartitionUtilTest {
}
};
boolean isSyncWithPartition = MTMVPartitionUtil
- .isSyncWithPartitions(mtmv, "name1", baseOlapTable,
Sets.newHashSet("name2"));
+ .isSyncWithPartitions(context, "name1",
Sets.newHashSet("name2"));
Assert.assertFalse(isSyncWithPartition);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
index 2b8c16509af..c2e402adb82 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
@@ -103,7 +103,7 @@ public class MTMVRewriteUtilTest {
minTimes = 0;
result = true;
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString,
(Set<String>) any,
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext)
any, anyString,
(Set<BaseTableInfo>) any,
(Set<String>) any);
minTimes = 0;
@@ -124,7 +124,7 @@ public class MTMVRewriteUtilTest {
minTimes = 0;
result = 2L;
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString,
(Set<String>) any,
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext)
any, anyString,
(Set<BaseTableInfo>) any,
(Set<String>) any);
minTimes = 0;
@@ -154,7 +154,7 @@ public class MTMVRewriteUtilTest {
minTimes = 0;
result = 2L;
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString,
(Set<String>) any,
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext)
any, anyString,
(Set<BaseTableInfo>) any,
(Set<String>) any);
minTimes = 0;
@@ -175,7 +175,7 @@ public class MTMVRewriteUtilTest {
minTimes = 0;
result = 1L;
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString,
(Set<String>) any,
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext)
any, anyString,
(Set<BaseTableInfo>) any,
(Set<String>) any);
minTimes = 0;
@@ -208,7 +208,7 @@ public class MTMVRewriteUtilTest {
public void testGetMTMVCanRewritePartitionsNotSync() throws
AnalysisException {
new Expectations() {
{
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString,
(Set<String>) any,
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext)
any, anyString,
(Set<BaseTableInfo>) any,
(Set<String>) any);
minTimes = 0;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
index 512bd6099f0..0bcd2f05690 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
@@ -28,7 +28,6 @@ import
org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Mocked;
@@ -38,7 +37,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public class MTMVTaskTest {
@@ -84,8 +82,7 @@ public class MTMVTaskTest {
// minTimes = 0;
// result = poneId;
- mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any,
- (Map<String, Set<String>>) any);
+ mtmvPartitionUtil.isMTMVSync((MTMVRefreshContext) any,
(Set<BaseTableInfo>) any, (Set<String>) any);
minTimes = 0;
result = true;
@@ -104,7 +101,7 @@ public class MTMVTaskTest {
public void testCalculateNeedRefreshPartitionsManualComplete() throws
AnalysisException {
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, null, true);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<String> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
+ List<String> result = task.calculateNeedRefreshPartitions(null);
Assert.assertEquals(allPartitionNames, result);
}
@@ -112,7 +109,7 @@ public class MTMVTaskTest {
public void testCalculateNeedRefreshPartitionsManualPartitions() throws
AnalysisException {
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName),
false);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<String> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
+ List<String> result = task.calculateNeedRefreshPartitions(null);
Assert.assertEquals(Lists.newArrayList(poneName), result);
}
@@ -127,7 +124,7 @@ public class MTMVTaskTest {
};
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<String> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
+ List<String> result = task.calculateNeedRefreshPartitions(null);
Assert.assertTrue(CollectionUtils.isEmpty(result));
}
@@ -135,7 +132,7 @@ public class MTMVTaskTest {
public void testCalculateNeedRefreshPartitionsSystemComplete() throws
AnalysisException {
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<String> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
+ List<String> result = task.calculateNeedRefreshPartitions(null);
Assert.assertEquals(allPartitionNames, result);
}
@@ -143,15 +140,14 @@ public class MTMVTaskTest {
public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete()
throws AnalysisException {
new Expectations() {
{
- mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any,
- (Map<String, Set<String>>) any);
+ mtmvPartitionUtil.isMTMVSync((MTMVRefreshContext) any,
(Set<BaseTableInfo>) any, (Set<String>) any);
minTimes = 0;
result = false;
}
};
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<String> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
+ List<String> result = task.calculateNeedRefreshPartitions(null);
Assert.assertEquals(allPartitionNames, result);
}
@@ -160,7 +156,7 @@ public class MTMVTaskTest {
new Expectations() {
{
mtmvPartitionUtil
- .isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any, (Map<String, Set<String>>) any);
+ .isMTMVSync((MTMVRefreshContext) any,
(Set<BaseTableInfo>) any, (Set<String>) any);
minTimes = 0;
result = false;
@@ -169,14 +165,14 @@ public class MTMVTaskTest {
result = RefreshMethod.AUTO;
mtmvPartitionUtil
- .getMTMVNeedRefreshPartitions(mtmv,
(Set<BaseTableInfo>) any, (Map<String, Set<String>>) any);
+ .getMTMVNeedRefreshPartitions((MTMVRefreshContext)
any, (Set<BaseTableInfo>) any);
minTimes = 0;
result = Lists.newArrayList(ptwoName);
}
};
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<String> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
+ List<String> result = task.calculateNeedRefreshPartitions(null);
Assert.assertEquals(Lists.newArrayList(ptwoName), result);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]