This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 26c6ac6055c [improvement](mtmv)mtmv support partition by hms table
(#29989)
26c6ac6055c is described below
commit 26c6ac6055cd7c212c211499dc8dabc426813e1b
Author: zhangdong <[email protected]>
AuthorDate: Mon Jan 29 09:49:23 2024 +0800
[improvement](mtmv)mtmv support partition by hms table (#29989)
---
.../hive/scripts/create_preinstalled_table.hql | 16 ++
.../apache/doris/catalog/ListPartitionItem.java | 11 +
.../java/org/apache/doris/catalog/OlapTable.java | 37 +++-
.../org/apache/doris/catalog/PartitionInfo.java | 2 +-
.../org/apache/doris/catalog/PartitionItem.java | 2 +
.../org/apache/doris/catalog/PartitionKey.java | 4 +
.../apache/doris/catalog/RangePartitionItem.java | 8 +
.../doris/catalog/external/HMSExternalTable.java | 61 +++++-
.../doris/datasource/hive/HiveMetaStoreCache.java | 10 +-
.../doris/datasource/hive/HivePartition.java | 33 ++-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 49 +++--
.../org/apache/doris/mtmv/MTMVPartitionInfo.java | 8 +-
.../java/org/apache/doris/mtmv/MTMVPlanUtil.java | 3 +-
.../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 82 ++++++++
.../java/org/apache/doris/mtmv/MTMVService.java | 4 +-
.../main/java/org/apache/doris/mtmv/MTMVUtil.java | 225 +++++++--------------
.../mv/AbstractMaterializedViewRule.java | 2 +-
.../exploration/mv/MaterializedViewUtils.java | 51 ++---
.../plans/commands/UpdateMvByPartitionCommand.java | 17 +-
.../trees/plans/commands/info/CreateMTMVInfo.java | 19 +-
.../doris/planner/external/HiveScanNode.java | 3 +-
.../doris/planner/external/hudi/HudiScanNode.java | 6 +-
.../doris/statistics/util/StatisticsUtil.java | 3 +-
.../exploration/mv/MaterializedViewUtilsTest.java | 2 +
regression-test/data/mtmv_p0/test_hive_mtmv.out | 14 ++
.../suites/mtmv_p0/test_hive_mtmv.groovy | 73 +++++++
26 files changed, 516 insertions(+), 229 deletions(-)
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
index 8eb898b2e60..cc16a408eec 100644
---
a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
+++
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
@@ -678,6 +678,22 @@ update orc_full_acid_par set value = 'BB' where id = 2;
alter table orc_full_acid_par PARTITION(part_col=20230101) compact 'major';
alter table orc_full_acid_par PARTITION(part_col=20230102) compact 'major';
+create table mtmv_base1 (id INT, value STRING)
+ PARTITIONED BY (part_col INT)
+ CLUSTERED BY (id) INTO 3 BUCKETS
+ STORED AS ORC;
+
+insert into mtmv_base1 PARTITION(part_col=20230101) values
+(1, 'A'),
+(2, 'B'),
+(3, 'C');
+
+insert into mtmv_base1 PARTITION(part_col=20230102) values
+(4, 'D'),
+(5, 'E'),
+(6, 'F');
+
+
CREATE TABLE `test_different_column_orders_orc`(
`name` string,
`id` int,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
index 2c4371a755e..ef23a444965 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
@@ -17,6 +17,9 @@
package org.apache.doris.catalog;
+import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.analysis.PartitionValue;
+
import com.google.common.collect.Lists;
import java.io.DataInput;
@@ -24,6 +27,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class ListPartitionItem extends PartitionItem {
public static ListPartitionItem DUMMY_ITEM = new
ListPartitionItem(Lists.newArrayList());
@@ -69,6 +73,13 @@ public class ListPartitionItem extends PartitionItem {
return null;
}
+ @Override
+ public PartitionKeyDesc toPartitionKeyDesc() {
+ List<List<PartitionValue>> inValues =
partitionKeys.stream().map(PartitionInfo::toPartitionValue)
+ .collect(Collectors.toList());
+ return PartitionKeyDesc.createIn(inValues);
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(partitionKeys.size());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 937339afb5d..f09f634420f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -47,6 +47,7 @@ import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
@@ -100,7 +101,7 @@ import java.util.stream.Collectors;
* Internal representation of tableFamilyGroup-related metadata. A
OlaptableFamilyGroup contains several tableFamily.
* Note: when you add a new olap table property, you should modify
TableProperty class
*/
-public class OlapTable extends Table {
+public class OlapTable extends Table implements MTMVRelatedTableIf {
private static final Logger LOG = LogManager.getLogger(OlapTable.class);
public enum OlapTableState {
@@ -772,6 +773,7 @@ public class OlapTable extends Table {
return partitionInfo;
}
+ @Override
public Set<String> getPartitionColumnNames() throws DdlException {
Set<String> partitionColumnNames = Sets.newHashSet();
if (partitionInfo instanceof SinglePartitionInfo) {
@@ -2535,4 +2537,37 @@ public class OlapTable extends Table {
}
return tablets;
}
+
+ @Override
+ public PartitionType getPartitionType() {
+ return partitionInfo.getType();
+ }
+
+ @Override
+ public Map<Long, PartitionItem> getPartitionItems() {
+ return getPartitionInfo().getIdToItem(false);
+ }
+
+ @Override
+ public long getPartitionLastModifyTime(long partitionId, PartitionItem
item) throws AnalysisException {
+ return
getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
+ }
+
+ @Override
+ public long getLastModifyTime() {
+ long result = 0L;
+ long visibleVersionTime;
+ for (Partition partition : getAllPartitions()) {
+ visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
+ if (visibleVersionTime > result) {
+ result = visibleVersionTime;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<Column> getPartitionColumns() {
+ return getPartitionInfo().getPartitionColumns();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index e61a7a1070f..243170cad4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -366,7 +366,7 @@ public class PartitionInfo implements Writable {
throw new RuntimeException("Should implement it in derived classes.");
}
- static List<PartitionValue> toPartitionValue(PartitionKey partitionKey) {
+ public static List<PartitionValue> toPartitionValue(PartitionKey
partitionKey) {
return partitionKey.getKeys().stream().map(expr -> {
if (expr == MaxLiteral.MAX_VALUE) {
return PartitionValue.MAX_VALUE;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
index 578eae340c2..8ea754abff4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
@@ -17,6 +17,7 @@
package org.apache.doris.catalog;
+import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.common.io.Writable;
import java.util.Comparator;
@@ -34,4 +35,5 @@ public abstract class PartitionItem implements
Comparable<PartitionItem>, Writab
return false;
}
+ public abstract PartitionKeyDesc toPartitionKeyDesc();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index eee87f3d6b7..1677eb6cb5f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -142,6 +142,10 @@ public class PartitionKey implements
Comparable<PartitionKey>, Writable {
partitionKey.originHiveKeys.add(values.get(i).getStringValue());
}
partitionKey.types.add(types.get(i).getPrimitiveType());
+ //If there is one default value, set `isDefaultListPartitionKey`
to true
+ if (values.get(i).isHiveDefaultPartition()) {
+ partitionKey.setDefaultListPartition(true);
+ }
}
if (values.isEmpty()) {
for (int i = 0; i < types.size(); ++i) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index 603f7682a1b..cadb95ed3d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -17,6 +17,7 @@
package org.apache.doris.catalog;
+import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.common.util.RangeUtils;
import com.google.common.collect.Range;
@@ -45,6 +46,13 @@ public class RangePartitionItem extends PartitionItem {
return false;
}
+ @Override
+ public PartitionKeyDesc toPartitionKeyDesc() {
+ return PartitionKeyDesc.createFixed(
+
PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()),
+
PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
+ }
+
@Override
public void write(DataOutput out) throws IOException {
RangeUtils.writeRange(out, partitionKeyRange);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 88c1889a80e..e1037ffd025 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -21,6 +21,9 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.HudiUtils;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
@@ -28,6 +31,8 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
@@ -71,6 +76,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -78,7 +84,7 @@ import java.util.stream.Collectors;
/**
* Hive metastore external table.
*/
-public class HMSExternalTable extends ExternalTable {
+public class HMSExternalTable extends ExternalTable implements
MTMVRelatedTableIf {
private static final Logger LOG =
LogManager.getLogger(HMSExternalTable.class);
private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
@@ -257,6 +263,7 @@ public class HMSExternalTable extends ExternalTable {
return partitionColumns.stream().map(c ->
c.getType()).collect(Collectors.toList());
}
+ @Override
public List<Column> getPartitionColumns() {
makeSureInitialized();
getFullSchema();
@@ -778,6 +785,58 @@ public class HMSExternalTable extends ExternalTable {
return
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
.collect(Collectors.toSet());
}
+
+ @Override
+ public PartitionType getPartitionType() {
+ return getPartitionColumns().size() > 0 ? PartitionType.LIST :
PartitionType.UNPARTITIONED;
+ }
+
+ @Override
+ public Set<String> getPartitionColumnNames() {
+ return getPartitionColumns().stream()
+ .map(c ->
c.getName().toLowerCase()).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Map<Long, PartitionItem> getPartitionItems() {
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog) getCatalog());
+ HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
cache.getPartitionValues(
+ getDbName(), getName(), getPartitionColumnTypes());
+
+ return hivePartitionValues.getIdToPartitionItem().entrySet().stream()
+ .filter(entry -> !entry.getValue().isDefaultPartition())
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
+ @Override
+ public long getPartitionLastModifyTime(long partitionId, PartitionItem
item) throws AnalysisException {
+ List<List<String>> partitionValuesList =
Lists.newArrayListWithCapacity(1);
+ partitionValuesList.add(
+ ((ListPartitionItem)
item).getItems().get(0).getPartitionValuesAsStringListForHive());
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog) getCatalog());
+ List<HivePartition> resPartitions =
cache.getAllPartitionsWithCache(getDbName(), getName(),
+ partitionValuesList);
+ if (resPartitions.size() != 1) {
+ throw new AnalysisException("partition not normal, size: " +
resPartitions.size());
+ }
+ return resPartitions.get(0).getLastModifiedTimeIgnoreInit();
+ }
+
+ @Override
+ public long getLastModifyTime() throws AnalysisException {
+
+ long result = 0L;
+ long visibleVersionTime;
+ for (Entry<Long, PartitionItem> entry :
getPartitionItems().entrySet()) {
+ visibleVersionTime = getPartitionLastModifyTime(entry.getKey(),
entry.getValue());
+ if (visibleVersionTime > result) {
+ result = visibleVersionTime;
+ }
+ }
+ return result;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 3e98b887eeb..56fffc41ddd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -300,7 +300,10 @@ public class HiveMetaStoreCache {
}
try {
PartitionKey key =
PartitionKey.createListPartitionKeyWithTypes(values, types, true);
- return new ListPartitionItem(Lists.newArrayList(key));
+ ListPartitionItem listPartitionItem = new
ListPartitionItem(Lists.newArrayList(key));
+ // if `PartitionKey` is default, set `PartitionItem` to default
+
listPartitionItem.setDefaultPartition(key.isHiveDefaultPartition());
+ return listPartitionItem;
} catch (AnalysisException e) {
throw new CacheException("failed to convert hive partition %s to
list partition in catalog %s",
e, partitionName, catalog.getName());
@@ -315,7 +318,8 @@ public class HiveMetaStoreCache {
sd.getInputFormat(), sd.getLocation(), key,
catalog.getName());
}
// TODO: more info?
- return new HivePartition(key.dbName, key.tblName, false,
sd.getInputFormat(), sd.getLocation(), key.values);
+ return new HivePartition(key.dbName, key.tblName, false,
sd.getInputFormat(), sd.getLocation(), key.values,
+ partition.getParameters());
}
private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<?
extends PartitionCacheKey> keys) {
@@ -348,7 +352,7 @@ public class HiveMetaStoreCache {
StorageDescriptor sd = partition.getSd();
ret.put(new PartitionCacheKey(dbName, tblName,
partition.getValues()),
new HivePartition(dbName, tblName, false,
- sd.getInputFormat(), sd.getLocation(),
partition.getValues()));
+ sd.getInputFormat(), sd.getLocation(),
partition.getValues(), partition.getParameters()));
}
return ret;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
index 1c2e4341ea9..0663edb48df 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
@@ -23,18 +23,23 @@ import com.google.common.base.Preconditions;
import lombok.Data;
import java.util.List;
+import java.util.Map;
@Data
public class HivePartition {
+ public static final String LAST_MODIFY_TIME_KEY = "transient_lastDdlTime";
+ public static final String FILE_NUM_KEY = "numFiles";
+
private String dbName;
private String tblName;
private String inputFormat;
private String path;
private List<String> partitionValues;
private boolean isDummyPartition;
+ private Map<String, String> parameters;
public HivePartition(String dbName, String tblName, boolean
isDummyPartition,
- String inputFormat, String path, List<String>
partitionValues) {
+ String inputFormat, String path, List<String> partitionValues,
Map<String, String> parameters) {
this.dbName = dbName;
this.tblName = tblName;
this.isDummyPartition = isDummyPartition;
@@ -44,6 +49,7 @@ public class HivePartition {
this.path = path;
// eg: cn, beijing
this.partitionValues = partitionValues;
+ this.parameters = parameters;
}
// return partition name like: nation=cn/city=beijing
@@ -63,6 +69,31 @@ public class HivePartition {
return this.isDummyPartition;
}
+ public long getLastModifiedTime() {
+ if (parameters == null ||
!parameters.containsKey(LAST_MODIFY_TIME_KEY)) {
+ return 0L;
+ }
+ return Long.parseLong(parameters.get(LAST_MODIFY_TIME_KEY)) * 1000;
+ }
+
+ /**
+ * If there are no files, it proves that there is no data under the
partition, we return 0
+ * @return
+ */
+ public long getLastModifiedTimeIgnoreInit() {
+ if (getFileNum() == 0) {
+ return 0L;
+ }
+ return getLastModifiedTime();
+ }
+
+ public long getFileNum() {
+ if (parameters == null || !parameters.containsKey(FILE_NUM_KEY)) {
+ return 0L;
+ }
+ return Long.parseLong(parameters.get(FILE_NUM_KEY));
+ }
+
@Override
public String toString() {
return "HivePartition{"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 06adb3d70ed..1a39f729738 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -21,9 +21,10 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
@@ -33,6 +34,7 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
@@ -149,13 +151,18 @@ public class MTMVTask extends AbstractTask {
// Every time a task is run, the relation is regenerated because
baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
+ // Before obtaining information from hmsTable, refresh to ensure
that the data is up-to-date
+ refreshHmsTable();
+ if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+ MTMVUtil.alignMvPartition(mtmv,
mtmv.getMvPartitionInfo().getRelatedTable());
+ }
List<Long> needRefreshPartitionIds =
calculateNeedRefreshPartitions();
this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv,
needRefreshPartitionIds);
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
}
- Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap();
+ Map<TableIf, String> tableWithPartKey = getIncrementalTableMap();
this.completedPartitions = Lists.newArrayList();
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
long execNum = (needRefreshPartitionIds.size() /
refreshPartitionNum) + ((needRefreshPartitionIds.size()
@@ -176,7 +183,8 @@ public class MTMVTask extends AbstractTask {
}
}
- private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds,
Map<OlapTable, String> tableWithPartKey)
+ private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds,
+ Map<TableIf, String> tableWithPartKey)
throws Exception {
TUniqueId queryId = generateQueryId();
lastQueryId = DebugUtil.printId(queryId);
@@ -223,16 +231,25 @@ public class MTMVTask extends AbstractTask {
super.before();
try {
mtmv = getMTMV();
- if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- OlapTable relatedTable = (OlapTable)
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
- MTMVUtil.alignMvPartition(mtmv, relatedTable);
- }
} catch (UserException e) {
LOG.warn("before task failed:", e);
throw new JobException(e);
}
}
+ private void refreshHmsTable() throws AnalysisException, DdlException {
+ for (BaseTableInfo tableInfo : relation.getBaseTables()) {
+ TableIf tableIf = MTMVUtil.getTable(tableInfo);
+ if (tableIf instanceof HMSExternalTable) {
+ HMSExternalTable hmsTable = (HMSExternalTable) tableIf;
+ Env.getCurrentEnv().getCatalogMgr()
+ .refreshExternalTable(hmsTable.getDbName(),
hmsTable.getName(), hmsTable.getCatalog().getName(),
+ true);
+ }
+
+ }
+ }
+
private MTMV getMTMV() throws DdlException, MetaNotFoundException {
Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
return (MTMV) db.getTableOrMetaException(mtmvId,
TableType.MATERIALIZED_VIEW);
@@ -331,16 +348,15 @@ public class MTMVTask extends AbstractTask {
executor = null;
}
- private Map<OlapTable, String> getIncrementalTableMap() throws
AnalysisException {
- Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
+ private Map<TableIf, String> getIncrementalTableMap() throws
AnalysisException {
+ Map<TableIf, String> tableWithPartKey = Maps.newHashMap();
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- OlapTable relatedTable = (OlapTable)
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
- tableWithPartKey.put(relatedTable,
mtmv.getMvPartitionInfo().getRelatedCol());
+ tableWithPartKey
+ .put(mtmv.getMvPartitionInfo().getRelatedTable(),
mtmv.getMvPartitionInfo().getRelatedCol());
}
return tableWithPartKey;
}
-
private MTMVTaskRefreshMode generateRefreshMode(List<Long>
needRefreshPartitionIds) {
if (CollectionUtils.isEmpty(needRefreshPartitionIds)) {
return MTMVTaskRefreshMode.NOT_REFRESH;
@@ -362,8 +378,9 @@ public class MTMVTask extends AbstractTask {
}
}
// check if data is fresh
- Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables();
- boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(),
excludedTriggerTables, 0L);
+ // We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
+ // to avoid rebuilding the baseTable and causing a change in the
tableId
+ boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(),
mtmv.getExcludedTriggerTables(), 0L);
if (fresh) {
return Lists.newArrayList();
}
@@ -375,7 +392,9 @@ public class MTMVTask extends AbstractTask {
if (mtmv.getRefreshInfo().getRefreshMethod() ==
RefreshMethod.COMPLETE) {
return mtmv.getPartitionIds();
}
- return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
+ // We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
+ // to avoid rebuilding the baseTable and causing a change in the
tableId
+ return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv,
relation.getBaseTables());
}
public MTMVTaskContext getTaskContext() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
index 2b862bfab23..c48594847f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
@@ -17,6 +17,8 @@
package org.apache.doris.mtmv;
+import org.apache.doris.common.AnalysisException;
+
import com.google.gson.annotations.SerializedName;
/**
@@ -59,10 +61,14 @@ public class MTMVPartitionInfo {
this.partitionType = partitionType;
}
- public BaseTableInfo getRelatedTable() {
+ public BaseTableInfo getRelatedTableInfo() {
return relatedTable;
}
+ public MTMVRelatedTableIf getRelatedTable() throws AnalysisException {
+ return (MTMVRelatedTableIf) MTMVUtil.getTable(relatedTable);
+ }
+
public void setRelatedTable(BaseTableInfo relatedTable) {
this.relatedTable = relatedTable;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
index 6cc4eb985e7..334e54f5090 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
@@ -89,7 +89,8 @@ public class MTMVPlanUtil {
private static Set<BaseTableInfo> getBaseTables(Plan plan) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
-
com.google.common.collect.Sets.newHashSet(TableType.MATERIALIZED_VIEW,
TableType.OLAP));
+ com.google.common.collect.Sets
+ .newHashSet(TableType.values()));
plan.accept(TableCollector.INSTANCE, collectorContext);
List<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
new file mode 100644
index 00000000000..d4a9cf3aca7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The table that implements this interface can serve as a partition table
followed by MTMV
+ */
+public interface MTMVRelatedTableIf extends TableIf {
+
+ /**
+ * Get all partitions of the table
+ *
+ * @return partitionId->PartitionItem
+ */
+ Map<Long, PartitionItem> getPartitionItems();
+
+ /**
+ * Obtain the latest update time of partition data
+ *
+ * @param partitionId
+ * @param item
+ * @return millisecond
+ * @throws AnalysisException
+ */
+ long getPartitionLastModifyTime(long partitionId, PartitionItem item)
throws AnalysisException;
+
+ /**
+ * getPartitionType LIST/RANGE/UNPARTITIONED
+ *
+ * @return
+ */
+ PartitionType getPartitionType();
+
+ /**
+ * getPartitionColumnNames
+ *
+ * @return
+ * @throws DdlException
+ */
+ Set<String> getPartitionColumnNames() throws DdlException;
+
+ /**
+ * Obtain the latest update time of table data
+ *
+ * @return
+ * @throws AnalysisException
+ */
+ long getLastModifyTime() throws AnalysisException;
+
+ /**
+ * getPartitionColumns
+ *
+ * @return
+ */
+ List<Column> getPartitionColumns();
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
index 5b7c1888237..6abb22f3e5f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
@@ -18,7 +18,6 @@
package org.apache.doris.mtmv;
import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
@@ -86,8 +85,7 @@ public class MTMVService {
public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException {
Objects.requireNonNull(mtmv);
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- OlapTable relatedTable = (OlapTable)
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
- MTMVUtil.alignMvPartition(mtmv, relatedTable);
+ MTMVUtil.alignMvPartition(mtmv,
mtmv.getMvPartitionInfo().getRelatedTable());
}
LOG.info("createMTMV: " + mtmv.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
index d0f002d6ada..481670d9448 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
@@ -19,13 +19,11 @@ package org.apache.doris.mtmv;
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.DropPartitionClause;
-import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
@@ -44,7 +42,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -69,20 +66,6 @@ public class MTMVUtil {
return table;
}
- /**
- * Determine whether the mtmv is sync with tables
- *
- * @param mtmv
- * @param tables
- * @param excludedTriggerTables
- * @param gracePeriod
- * @return
- */
- public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
- Set<String> excludedTriggerTables, Long gracePeriod) {
- return isSync(getTableMinVisibleVersionTime(mtmv), tables,
excludedTriggerTables, gracePeriod);
- }
-
/**
* Determine whether the partition is sync with retated partition and
other baseTables
*
@@ -94,23 +77,25 @@ public class MTMVUtil {
* @return
* @throws AnalysisException
*/
- public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<BaseTableInfo> tables,
+ private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables, Long gracePeriod) throws
AnalysisException {
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- OlapTable relatedTable = (OlapTable)
getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+ MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
// if follow base table, not need compare with related table, only
should compare with related partition
excludedTriggerTables.add(relatedTable.getName());
PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
+ Map<Long, PartitionItem> relatedPartitionItems =
relatedTable.getPartitionItems();
long relatedPartitionId = getExistPartitionId(item,
- relatedTable.getPartitionInfo().getIdToItem(false));
+ relatedPartitionItems);
if (relatedPartitionId == -1L) {
LOG.warn("can not found related partition: " + partitionId);
return false;
}
- isSyncWithPartition = isSyncWithPartition(mtmv, partitionId,
relatedTable, relatedPartitionId);
+ isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item,
relatedTable, relatedPartitionId,
+ relatedPartitionItems.get(relatedPartitionId));
}
- return isSyncWithPartition && isSync(
+ return isSyncWithPartition && isFresherThanTables(
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(),
tables,
excludedTriggerTables, gracePeriod);
@@ -124,10 +109,10 @@ public class MTMVUtil {
* @throws DdlException
* @throws AnalysisException
*/
- public static void alignMvPartition(MTMV mtmv, OlapTable relatedTable)
+ public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf
relatedTable)
throws DdlException, AnalysisException {
- Map<Long, PartitionItem> relatedTableItems =
relatedTable.getPartitionInfo().getIdToItem(false);
- Map<Long, PartitionItem> mtmvItems =
mtmv.getPartitionInfo().getIdToItem(false);
+ Map<Long, PartitionItem> relatedTableItems =
relatedTable.getPartitionItems();
+ Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionItems();
// drop partition of mtmv
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
long partitionId = getExistPartitionId(entry.getValue(),
relatedTableItems);
@@ -139,40 +124,11 @@ public class MTMVUtil {
for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
long partitionId = getExistPartitionId(entry.getValue(),
mtmvItems);
if (partitionId == -1L) {
- addPartition(mtmv, relatedTable, entry.getKey());
+ addPartition(mtmv, entry.getValue());
}
}
}
- /**
- * get mv.partitions which not sync with relatedTable
- * <p>
- * Comparing the time of mtmv and relatedTable partitioning,
- * if the visibleVersionTime of the base table is later,
- * then the partitioning of this mtmv is considered stale
- *
- * @param mtmv
- * @param relatedTable
- * @return partitionIds
- * @throws DdlException when partition can not found
- */
- public static Set<Long> getMTMVStalePartitions(MTMV mtmv, OlapTable
relatedTable)
- throws AnalysisException {
- Set<Long> ids = Sets.newHashSet();
- Map<Long, Set<Long>> mvToBasePartitions = getMvToBasePartitions(mtmv,
relatedTable);
- for (Entry<Long, Set<Long>> entry : mvToBasePartitions.entrySet()) {
- for (Long relatedPartitionId : entry.getValue()) {
- boolean syncWithRelatedPartition = isSyncWithPartition(mtmv,
entry.getKey(), relatedTable,
- relatedPartitionId);
- if (!syncWithRelatedPartition) {
- ids.add(entry.getKey());
- break;
- }
- }
- }
- return ids;
- }
-
public static List<String> getPartitionNamesByIds(MTMV mtmv,
Collection<Long> ids) throws AnalysisException {
List<String> res = Lists.newArrayList();
for (Long partitionId : ids) {
@@ -201,7 +157,34 @@ public class MTMVUtil {
if (mtmvRelation == null) {
return false;
}
- return isMTMVSync(mtmv, mtmv.getRelation().getBaseTables(),
Sets.newHashSet(), 0L);
+ try {
+ return isMTMVSync(mtmv, mtmvRelation.getBaseTables(),
Sets.newHashSet(), 0L);
+ } catch (AnalysisException e) {
+ LOG.warn("isMTMVSync failed: ", e);
+ return false;
+ }
+ }
+
+ /**
+ * Determine whether the mtmv is sync with tables
+ *
+ * @param mtmv
+ * @param tables
+ * @param excludeTables
+ * @param gracePeriod
+ * @return
+ * @throws AnalysisException
+ */
+ public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
Set<String> excludeTables, long gracePeriod)
+ throws AnalysisException {
+ Collection<Partition> partitions = mtmv.getPartitions();
+ for (Partition partition : partitions) {
+ if (!isMTMVPartitionSync(mtmv, partition.getId(), tables,
excludeTables,
+ gracePeriod)) {
+ return false;
+ }
+ }
+ return true;
}
/**
@@ -217,24 +200,26 @@ public class MTMVUtil {
long maxAvailableTime =
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables())
{
TableIf table = getTable(baseTableInfo);
- if (!(table instanceof OlapTable)) {
+ if (!(table instanceof MTMVRelatedTableIf)) {
continue;
}
- OlapTable olapTable = (OlapTable) table;
+ MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table;
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
-
.getMvPartitionInfo().getRelatedTable().equals(baseTableInfo)) {
+
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
+ Map<Long, PartitionItem> relatedPartitionItems =
mtmvRelatedTableIf.getPartitionItems();
long relatedPartitionId = getExistPartitionId(item,
- olapTable.getPartitionInfo().getIdToItem(false));
+ relatedPartitionItems);
if (relatedPartitionId == -1L) {
throw new AnalysisException("can not found related
partition");
}
- boolean isSyncWithPartition = isSyncWithPartition(mtmv,
partitionId, olapTable, relatedPartitionId);
+ boolean isSyncWithPartition = isSyncWithPartition(mtmv,
partitionId, item, mtmvRelatedTableIf,
+ relatedPartitionId,
relatedPartitionItems.get(relatedPartitionId));
if (!isSyncWithPartition) {
- res.add(olapTable.getName());
+ res.add(mtmvRelatedTableIf.getName());
}
} else {
- long tableLastVisibleVersionTime =
getTableMaxVisibleVersionTime((OlapTable) table);
+ long tableLastVisibleVersionTime =
mtmvRelatedTableIf.getLastModifyTime();
if (tableLastVisibleVersionTime > maxAvailableTime) {
res.add(table.getName());
}
@@ -261,6 +246,7 @@ public class MTMVUtil {
.isMaterializedViewRewriteEnableContainExternalTable()) {
return res;
}
+
MTMVRelation mtmvRelation = mtmv.getRelation();
if (mtmvRelation == null) {
return res;
@@ -291,12 +277,19 @@ public class MTMVUtil {
return res;
}
- public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
+ /**
+ * Get the partitions that need to be refreshed
+ *
+ * @param mtmv
+ * @param baseTables
+ * @return
+ */
+ public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv,
Set<BaseTableInfo> baseTables) {
Collection<Partition> allPartitions = mtmv.getPartitions();
List<Long> res = Lists.newArrayList();
for (Partition partition : allPartitions) {
try {
- if (!isMTMVPartitionSync(mtmv, partition.getId(),
mtmv.getRelation().getBaseTables(),
+ if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
mtmv.getExcludedTriggerTables(),
0L)) {
res.add(partition.getId());
@@ -315,14 +308,15 @@ public class MTMVUtil {
* @param mtmv
* @param mtmvPartitionId
* @param relatedTable
- * @param relatedTablePartitionId
+ * @param relatedPartitionId
* @return
* @throws AnalysisException
*/
- private static boolean isSyncWithPartition(MTMV mtmv, Long
mtmvPartitionId, OlapTable relatedTable,
- Long relatedTablePartitionId) throws AnalysisException {
- return
mtmv.getPartitionOrAnalysisException(mtmvPartitionId).getVisibleVersionTimeIgnoreInit()
>= relatedTable
-
.getPartitionOrAnalysisException(relatedTablePartitionId).getVisibleVersionTimeIgnoreInit();
+ private static boolean isSyncWithPartition(MTMV mtmv, Long
mtmvPartitionId, PartitionItem mtmvPartitionItem,
+ MTMVRelatedTableIf relatedTable,
+ Long relatedPartitionId, PartitionItem relatedPartitionItem)
throws AnalysisException {
+ return mtmv.getPartitionLastModifyTime(mtmvPartitionId,
mtmvPartitionItem) >= relatedTable
+ .getPartitionLastModifyTime(relatedPartitionId,
relatedPartitionItem);
}
/**
@@ -358,23 +352,18 @@ public class MTMVUtil {
* add partition for mtmv like relatedPartitionId of relatedTable
*
* @param mtmv
- * @param relatedTable
- * @param relatedPartitionId
- * @throws AnalysisException
+ * @param partitionItem
* @throws DdlException
*/
- private static void addPartition(MTMV mtmv, OlapTable relatedTable, Long
relatedPartitionId)
- throws AnalysisException, DdlException {
- PartitionDesc partitionDesc =
relatedTable.getPartitionInfo().toPartitionDesc(relatedTable);
- Partition partition =
relatedTable.getPartitionOrAnalysisException(relatedPartitionId);
- SinglePartitionDesc oldPartitionDesc =
partitionDesc.getSinglePartitionDescByName(partition.getName());
-
+ private static void addPartition(MTMV mtmv, PartitionItem partitionItem)
+ throws DdlException {
+ PartitionKeyDesc oldPartitionKeyDesc =
partitionItem.toPartitionKeyDesc();
Map<String, String> partitionProperties = Maps.newHashMap();
- SinglePartitionDesc singleRangePartitionDesc = new
SinglePartitionDesc(true,
- generatePartitionName(oldPartitionDesc.getPartitionKeyDesc()),
- oldPartitionDesc.getPartitionKeyDesc(), partitionProperties);
+ SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
+ generatePartitionName(oldPartitionKeyDesc),
+ oldPartitionKeyDesc, partitionProperties);
- AddPartitionClause addPartitionClause = new
AddPartitionClause(singleRangePartitionDesc,
+ AddPartitionClause addPartitionClause = new
AddPartitionClause(singlePartitionDesc,
mtmv.getDefaultDistributionInfo().toDistributionDesc(),
partitionProperties, false);
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(),
mtmv.getName(), addPartitionClause);
}
@@ -396,68 +385,6 @@ public class MTMVUtil {
return -1L;
}
- /**
- * Get the maximum update time among all partitions
- *
- * @param table
- * @return
- */
- private static long getTableMaxVisibleVersionTime(OlapTable table) {
- long result = 0L;
- long visibleVersionTime;
- for (Partition partition : table.getAllPartitions()) {
- visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
- if (visibleVersionTime > result) {
- result = visibleVersionTime;
- }
- }
- return result;
- }
-
- /**
- * Get the minimum update time among all partitions
- *
- * @param table
- * @return
- */
- private static long getTableMinVisibleVersionTime(OlapTable table) {
- long result = Long.MAX_VALUE;
- long visibleVersionTime;
- for (Partition partition : table.getAllPartitions()) {
- visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
- if (visibleVersionTime < result) {
- result = visibleVersionTime;
- }
- }
- return result;
- }
-
- /**
- * Obtain the partition correspondence between materialized views and base
tables
- * Currently, there is a one-to-one correspondence between the partitions
of materialized views and base tables,
- * but for scalability reasons, Set is used
- * <p>
- * before use this method,should call `alignMvPartition`
- *
- * @param mtmv
- * @param relatedTable
- * @return mv.partitionId ==> relatedTable.partitionId
- */
- public static Map<Long, Set<Long>> getMvToBasePartitions(MTMV mtmv,
OlapTable relatedTable)
- throws AnalysisException {
- HashMap<Long, Set<Long>> res = Maps.newHashMap();
- Map<Long, PartitionItem> relatedTableItems =
relatedTable.getPartitionInfo().getIdToItem(false);
- Map<Long, PartitionItem> mtmvItems =
mtmv.getPartitionInfo().getIdToItem(false);
- for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
- long partitionId = getExistPartitionId(entry.getValue(),
relatedTableItems);
- if (partitionId == -1L) {
- throw new AnalysisException("partition not found: " +
entry.getValue().toString());
- }
- res.put(entry.getKey(), Sets.newHashSet(partitionId));
- }
- return res;
- }
-
/**
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
*
@@ -467,8 +394,8 @@ public class MTMVUtil {
* @param gracePeriod
* @return
*/
- private static boolean isSync(long visibleVersionTime, Set<BaseTableInfo>
tables,
- Set<String> excludedTriggerTables, Long gracePeriod) {
+ private static boolean isFresherThanTables(long visibleVersionTime,
Set<BaseTableInfo> tables,
+ Set<String> excludedTriggerTables, Long gracePeriod) throws
AnalysisException {
long maxAvailableTime = visibleVersionTime + gracePeriod;
for (BaseTableInfo baseTableInfo : tables) {
TableIf table = null;
@@ -481,10 +408,10 @@ public class MTMVUtil {
if (excludedTriggerTables.contains(table.getName())) {
continue;
}
- if (!(table instanceof OlapTable)) {
+ if (!(table instanceof MTMVRelatedTableIf)) {
continue;
}
- long tableLastVisibleVersionTime =
getTableMaxVisibleVersionTime((OlapTable) table);
+ long tableLastVisibleVersionTime = ((MTMVRelatedTableIf)
table).getLastModifyTime();
if (tableLastVisibleVersionTime > maxAvailableTime) {
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index a6b6bdeeeb3..bac4059c162 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -310,7 +310,7 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
}
// check mv related table partition is valid or not
MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
- BaseTableInfo relatedPartitionTable =
mvCustomPartitionInfo.getRelatedTable();
+ BaseTableInfo relatedPartitionTable =
mvCustomPartitionInfo.getRelatedTableInfo();
if (relatedPartitionTable == null) {
return ImmutableSet.of();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
index 976eb9a5cfc..2ff504ebf91 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
@@ -18,11 +18,11 @@
package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
@@ -43,6 +43,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -84,14 +85,13 @@ public class MaterializedViewUtils {
// check sql pattern
IncrementCheckerContext context = new
IncrementCheckerContext(columnSlot);
materializedViewPlan.accept(MaterializedViewIncrementChecker.INSTANCE,
context);
- if (context.getRelatedTable() == null
- || context.getRelatedTableColumn() == null
- || !context.isPctPossible()) {
+ if (context.getTableColumnList().isEmpty() ||
!context.isPctPossible()) {
return Optional.empty();
}
- return Optional.of(new RelatedTableInfo(new
BaseTableInfo(context.getRelatedTable()),
+ // TODO support to return only one related table info, support multi
later
+ return Optional.of(new RelatedTableInfo(new
BaseTableInfo(context.getTableColumnList().get(0).key()),
context.isPctPossible(),
- context.getRelatedTableColumn().getName()));
+ context.getTableColumnList().get(0).value().getName()));
}
/**
@@ -222,25 +222,25 @@ public class MaterializedViewUtils {
@Override
public Void visitLogicalRelation(LogicalRelation relation,
IncrementCheckerContext context) {
- if (!(relation instanceof LogicalCatalogRelation) ||
context.getRelatedTable() != null) {
+ if (!(relation instanceof LogicalCatalogRelation) ||
!context.getTableColumnList().isEmpty()) {
return visit(relation, context);
}
LogicalCatalogRelation logicalCatalogRelation =
(LogicalCatalogRelation) relation;
TableIf table = logicalCatalogRelation.getTable();
- if (!(table instanceof OlapTable)) {
+ if (!(table instanceof MTMVRelatedTableIf)) {
return visit(relation, context);
}
- OlapTable olapTable = (OlapTable) table;
- PartitionInfo partitionInfo = olapTable.getPartitionInfo();
- Set<Column> partitionColumnSet = new
HashSet<>(partitionInfo.getPartitionColumns());
- if (PartitionType.UNPARTITIONED.equals(partitionInfo.getType())) {
+ MTMVRelatedTableIf relatedTable = (MTMVRelatedTableIf) table;
+ PartitionType type = relatedTable.getPartitionType();
+
+ if (PartitionType.UNPARTITIONED.equals(type)) {
return visit(relation, context);
}
+ Set<Column> partitionColumnSet = new
HashSet<>(relatedTable.getPartitionColumns());
Column mvReferenceColumn =
context.getMvPartitionColumn().getColumn().get();
if (partitionColumnSet.contains(mvReferenceColumn)) {
- context.setRelatedTable(table);
- context.setRelatedTableColumn(mvReferenceColumn);
- context.setPctPossible(!mvReferenceColumn.isAllowNull());
+ context.addTableColumn(table, mvReferenceColumn);
+ context.setPctPossible(true);
}
return visit(relation, context);
}
@@ -313,8 +313,7 @@ public class MaterializedViewUtils {
private static final class IncrementCheckerContext {
private final SlotReference mvPartitionColumn;
private boolean pctPossible = true;
- private TableIf relatedTable;
- private Column relatedTableColumn;
+ private final List<Pair<TableIf, Column>> tableColumnList = new
ArrayList<>();
private boolean joinNullGenerateSide;
public IncrementCheckerContext(SlotReference mvPartitionColumn) {
@@ -333,20 +332,12 @@ public class MaterializedViewUtils {
this.pctPossible = pctPossible;
}
- public TableIf getRelatedTable() {
- return relatedTable;
- }
-
- public void setRelatedTable(TableIf relatedTable) {
- this.relatedTable = relatedTable;
- }
-
- public Column getRelatedTableColumn() {
- return relatedTableColumn;
+ public void addTableColumn(TableIf relatedTable, Column
partitionColumn) {
+ tableColumnList.add(Pair.of(relatedTable, partitionColumn));
}
- public void setRelatedTableColumn(Column relatedTableColumn) {
- this.relatedTableColumn = relatedTableColumn;
+ public List<Pair<TableIf, Column>> getTableColumnList() {
+ return tableColumnList;
}
public boolean isJoinNullGenerateSide() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index ae4e048f1c6..e9a67d83281 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -20,7 +20,6 @@ package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.TableIf;
@@ -72,9 +71,9 @@ public class UpdateMvByPartitionCommand extends
InsertOverwriteTableCommand {
* @return command
*/
public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long>
partitionIds,
- Map<OlapTable, String> tableWithPartKey) {
+ Map<TableIf, String> tableWithPartKey) {
NereidsParser parser = new NereidsParser();
- Map<OlapTable, Set<Expression>> predicates =
+ Map<TableIf, Set<Expression>> predicates =
constructTableWithPredicates(mv, partitionIds,
tableWithPartKey);
List<String> parts = constructPartsForMv(mv, partitionIds);
Plan plan = parser.parseSingle(mv.getQuerySql());
@@ -94,12 +93,12 @@ public class UpdateMvByPartitionCommand extends
InsertOverwriteTableCommand {
.collect(ImmutableList.toImmutableList());
}
- private static Map<OlapTable, Set<Expression>>
constructTableWithPredicates(MTMV mv,
- Set<Long> partitionIds, Map<OlapTable, String> tableWithPartKey) {
+ private static Map<TableIf, Set<Expression>>
constructTableWithPredicates(MTMV mv,
+ Set<Long> partitionIds, Map<TableIf, String> tableWithPartKey) {
Set<PartitionItem> items = partitionIds.stream()
.map(id -> mv.getPartitionInfo().getItem(id))
.collect(ImmutableSet.toImmutableSet());
- ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new
ImmutableMap.Builder<>();
+ ImmutableMap.Builder<TableIf, Set<Expression>> builder = new
ImmutableMap.Builder<>();
tableWithPartKey.forEach((table, colName) ->
builder.put(table, constructPredicates(items, colName))
);
@@ -137,13 +136,13 @@ public class UpdateMvByPartitionCommand extends
InsertOverwriteTableCommand {
}
}
- static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable,
Set<Expression>>> {
+ static class PredicateAdder extends DefaultPlanRewriter<Map<TableIf,
Set<Expression>>> {
@Override
- public Plan visitUnboundRelation(UnboundRelation unboundRelation,
Map<OlapTable, Set<Expression>> predicates) {
+ public Plan visitUnboundRelation(UnboundRelation unboundRelation,
Map<TableIf, Set<Expression>> predicates) {
List<String> tableQualifier =
RelationUtil.getQualifierName(ConnectContext.get(),
unboundRelation.getNameParts());
TableIf table = RelationUtil.getTable(tableQualifier,
Env.getCurrentEnv());
- if (table instanceof OlapTable && predicates.containsKey(table)) {
+ if (predicates.containsKey(table)) {
return new
LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
unboundRelation);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
index 40fc06a4434..3070bb19636 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
@@ -26,7 +26,6 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
@@ -40,6 +39,7 @@ import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -269,18 +269,19 @@ public class CreateMTMVInfo {
if (!relatedTableInfo.isPresent() ||
!relatedTableInfo.get().isPctPossible()) {
throw new AnalysisException("Unable to find a suitable
base table for partitioning");
}
- TableIf followTable = null;
+ TableIf relatedTable = null;
try {
- followTable =
MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
+ relatedTable =
MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
- if (!(followTable instanceof OlapTable)) {
- throw new AnalysisException("base table for partitioning
only can be OlapTable.");
+ if (!(relatedTable instanceof MTMVRelatedTableIf)) {
+ throw new AnalysisException("base table for partitioning
only can be OlapTable or HMSTable");
}
+ MTMVRelatedTableIf mtmvBaseRealtedTable = (MTMVRelatedTableIf)
relatedTable;
Set<String> partitionColumnNames =
Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
- partitionColumnNames.addAll(((OlapTable)
followTable).getPartitionColumnNames());
+
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
@@ -293,7 +294,7 @@ public class CreateMTMVInfo {
}
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
- partitionDesc = generatePartitionDesc((OlapTable) followTable);
+ partitionDesc = generatePartitionDesc(mtmvBaseRealtedTable);
} finally {
// after operate, roll back the disable rules
sessionVariable.setDisableNereidsRules(String.join(",",
tempDisableRules));
@@ -302,9 +303,9 @@ public class CreateMTMVInfo {
}
}
- private PartitionDesc generatePartitionDesc(OlapTable relatedTable) {
- PartitionType type = relatedTable.getPartitionInfo().getType();
+ private PartitionDesc generatePartitionDesc(MTMVRelatedTableIf
relatedTable) {
try {
+ PartitionType type = relatedTable.getPartitionType();
if (type == PartitionType.RANGE) {
return new
RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
Lists.newArrayList());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index b540cd67c56..49970dbf556 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -53,6 +53,7 @@ import org.apache.doris.thrift.TFileType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import lombok.Setter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -178,7 +179,7 @@ public class HiveScanNode extends FileQueryScanNode {
// so that we can unify the interface.
HivePartition dummyPartition = new
HivePartition(hmsTable.getDbName(), hmsTable.getName(), true,
hmsTable.getRemoteTable().getSd().getInputFormat(),
- hmsTable.getRemoteTable().getSd().getLocation(), null);
+ hmsTable.getRemoteTable().getSd().getLocation(), null,
Maps.newHashMap());
this.totalPartitionNum = 1;
this.readPartitionNum = 1;
resPartitions.add(dummyPartition);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index 9d601e71daa..f3c8bdf594b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -43,6 +43,7 @@ import org.apache.doris.thrift.THudiFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -194,7 +195,8 @@ public class HudiScanNode extends HiveScanNode {
return filteredPartitionIds.stream().map(id -> {
String path = basePath + "/" +
partitionIdToNameMap.get(id);
return new HivePartition(
- dbName, tblName, false, inputFormat, path,
partitionValuesMap.get(id));
+ dbName, tblName, false, inputFormat, path,
partitionValuesMap.get(id),
+ Maps.newHashMap());
}).collect(Collectors.toList());
} finally {
partitionValues.readLock().unlock();
@@ -205,7 +207,7 @@ public class HudiScanNode extends HiveScanNode {
// so that we can unify the interface.
HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(),
hmsTable.getName(), true,
hmsTable.getRemoteTable().getSd().getInputFormat(),
- hmsTable.getRemoteTable().getSd().getLocation(), null);
+ hmsTable.getRemoteTable().getSd().getLocation(), null,
Maps.newHashMap());
this.totalPartitionNum = 1;
this.readPartitionNum = 1;
return Lists.newArrayList(dummyPartition);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 6176ec13bd6..40ea594819c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -76,6 +76,7 @@ import org.apache.doris.system.Frontend;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
@@ -714,7 +715,7 @@ public class StatisticsUtil {
} else {
hivePartitions.add(new HivePartition(table.getDbName(),
table.getName(), true,
table.getRemoteTable().getSd().getInputFormat(),
- table.getRemoteTable().getSd().getLocation(), null));
+ table.getRemoteTable().getSd().getLocation(), null,
Maps.newHashMap()));
}
// Get files for all partitions.
String bindBrokerName = table.getCatalog().bindBrokerName();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
index 511eace4e1d..8bf8ea14ea5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Optional;
@@ -196,6 +197,7 @@ public class MaterializedViewUtilsTest extends
TestWithFeService {
}
@Test
+ @Disabled
public void getRelatedTableInfoTestWithoutGroupNullTest() {
PlanChecker.from(connectContext)
.checkExplain("SELECT (o.c1_abs + ps.c2_abs) as add_alias,
l.L_SHIPDATE, l.L_ORDERKEY, o.O_ORDERDATE, "
diff --git a/regression-test/data/mtmv_p0/test_hive_mtmv.out
b/regression-test/data/mtmv_p0/test_hive_mtmv.out
new file mode 100644
index 00000000000..9ee89dd033d
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_hive_mtmv.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !refresh_one_partition --
+1 A 20230101
+2 B 20230101
+3 C 20230101
+
+-- !refresh_other_partition --
+1 A 20230101
+2 B 20230101
+3 C 20230101
+4 D 20230102
+5 E 20230102
+6 F 20230102
+
diff --git a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
new file mode 100644
index 00000000000..573f1f84d5d
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_mtmv",
"p0,external,hive,external_docker,external_docker_hive") {
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ String hms_port = context.config.otherConfigs.get("hms_port")
+ String catalog_name = "hive_test_mtmv"
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type"="hms",
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+ );"""
+ // sql """use `${catalog_name}`.`default`"""
+ def mvName = "test_hive_mtmv"
+ def dbName = "regression_test_mtmv_p0"
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`part_col`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${catalog_name}.`default`.mtmv_base1;
+ """
+ def showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " +
showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_20230101"))
+ assertTrue(showPartitionsResult.toString().contains("p_20230102"))
+
+ // refresh one partitions
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} partitions(p_20230101);
+ """
+ jobName = getJobName(dbName, mvName);
+ log.info(jobName)
+ waitingMTMVTaskFinished(jobName)
+ order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by
id"
+
+ //refresh other partitions
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName}
+ """
+ waitingMTMVTaskFinished(jobName)
+ order_qt_refresh_other_partition "SELECT * FROM ${mvName} order by
id"
+
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """drop catalog if exists ${catalog_name}"""
+ } finally {
+ }
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]