This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 586f155d84ede48f165c592a11d44dde391edbf0 Author: zhangdong <[email protected]> AuthorDate: Thu Mar 7 12:28:24 2024 +0800 [enhance](mtmv) MTMV Use partial partition of base table (#31632) MTMV add 3 properties: partition_sync_limit: digit partition_sync_time_unit: DAY/MONTH/YEAR partition_sync_date_format: like "%Y-%m-%d"/"%Y%m%d" For example, the current time is 2020-02-03 20:10:10 - If partition_sync_limit is set to 1 and partition_sync_time_unit is set to DAY, only partitions with a time greater than or equal to 2020-02-03 00:00:00 will be synchronized to the MTMV - If partition_sync_limit is set to 1 and partition_sync_time_unit is set to MONTH, only partitions with a time greater than or equal to 2020-02-01 00:00:00 will be synchronized to the MTMV - If partition_sync_limit is set to 1 and partition_sync_time_unit is set to YEAR, only partitions with a time greater than or equal to 2020-01-01 00:00:00 will be synchronized to the MTMV - If partition_sync_limit is set to 3 and partition_sync_time_unit is set to MONTH, only partitions with a time greater than or equal to 2019-12-01 00:00:00 will be synchronized to the MTMV - If partition_sync_limit is set to 4 and partition_sync_time_unit is set to DAY, only partitions with a time greater than or equal to 2020-01-31 00:00:00 will be synchronized to the MTMV --- .../apache/doris/catalog/ListPartitionItem.java | 21 ++ .../main/java/org/apache/doris/catalog/MTMV.java | 11 +- .../org/apache/doris/catalog/PartitionItem.java | 14 ++ .../apache/doris/catalog/RangePartitionItem.java | 18 ++ .../apache/doris/common/util/PropertyAnalyzer.java | 3 + .../apache/doris/mtmv/MTMVPartitionSyncConfig.java | 57 +++++ .../doris/mtmv/MTMVPartitionSyncTimeUnit.java | 36 ++++ .../org/apache/doris/mtmv/MTMVPartitionUtil.java | 11 +- .../org/apache/doris/mtmv/MTMVPropertyUtil.java | 138 ++++++++++++ .../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 31 ++- .../main/java/org/apache/doris/mtmv/MTMVUtil.java | 127 ++++++++++- .../plans/commands/info/AlterMTMVPropertyInfo.java | 39 +--- .../trees/plans/commands/info/CreateMTMVInfo.java | 49 +---- .../java/org/apache/doris/mtmv/MTMVUtilTest.java | 113 ++++++++++ .../mtmv_p0/test_hive_limit_partition_mtmv.out | 11 + .../data/mtmv_p0/test_limit_partition_mtmv.out | 17 ++ .../mtmv_p0/test_hive_limit_partition_mtmv.groovy | 162 ++++++++++++++ .../mtmv_p0/test_limit_partition_mtmv.groovy | 240 +++++++++++++++++++++ 18 files changed, 1010 insertions(+), 88 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java index d0f06dd7e55..1a4d188a0ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java @@ -20,6 +20,7 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.PartitionKeyDesc; import org.apache.doris.analysis.PartitionValue; import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVUtil; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -29,6 +30,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -99,6 +101,25 @@ public class ListPartitionItem extends PartitionItem { return PartitionKeyDesc.createIn(Lists.newArrayList(res)); } + @Override + public boolean isGreaterThanSpecifiedTime(int pos, Optional<String> dateFormatOptional, long nowTruncSubSec) + throws AnalysisException { + for (PartitionKey partitionKey : partitionKeys) { + if (partitionKey.getKeys().size() <= pos) { + throw new AnalysisException( + String.format("toPartitionKeyDesc IndexOutOfBounds, partitionKey: %s, pos: %d", + partitionKey.toString(), + pos)); + } + if (!isDefaultPartition() && MTMVUtil.getExprTimeSec(partitionKey.getKeys().get(pos), dateFormatOptional) + >= nowTruncSubSec) { + // As long as one of the partitionKeys meets the requirements, this partition needs to be retained + return true; + } + } + return false; + } + @Override public void write(DataOutput out) throws IOException { out.writeInt(partitionKeys.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 05622bfc0e7..80feb8f12ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -38,6 +38,7 @@ import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; import org.apache.doris.mtmv.MTMVRefreshSnapshot; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVStatus; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; @@ -217,7 +218,7 @@ public class MTMV extends OlapTable { public long getGracePeriod() { readMvLock(); try { - if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) { + if (!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD))) { return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000; } else { return 0L; @@ -243,7 +244,7 @@ public class MTMV extends OlapTable { public int getRefreshPartitionNum() { readMvLock(); try { - if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { + if (!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM))) { int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value; } else { @@ -257,7 +258,7 @@ public class MTMV extends OlapTable { public Set<String> getExcludedTriggerTables() { readMvLock(); try { - if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { + if (StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES))) { return Sets.newHashSet(); } String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(","); @@ -326,8 +327,10 @@ public class MTMV extends OlapTable { return Maps.newHashMap(); } Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>(); - Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(); int relatedColPos = mvPartitionInfo.getRelatedColPos(); + Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable() + .getPartitionItemsByTimeFilter(relatedColPos, + MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties)); for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) { PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos); if (res.containsKey(partitionKeyDesc)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java index 3c29aa2f48b..af1bbc9d0e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java @@ -23,6 +23,7 @@ import org.apache.doris.common.io.Writable; import java.util.Comparator; import java.util.Map; +import java.util.Optional; public abstract class PartitionItem implements Comparable<PartitionItem>, Writable { public static final Comparator<Map.Entry<Long, PartitionItem>> ITEM_MAP_ENTRY_COMPARATOR = @@ -46,4 +47,17 @@ public abstract class PartitionItem implements Comparable<PartitionItem>, Writab * @throws AnalysisException */ public abstract PartitionKeyDesc toPartitionKeyDesc(int pos) throws AnalysisException; + + /** + * Check if the partition meets the time requirements + * + * @param pos The position of the partition column to be checked in all partition columns + * @param dateFormatOptional Convert other types to date format + * @param nowTruncSubSec The time to compare + * @return + * @throws AnalysisException + */ + public abstract boolean isGreaterThanSpecifiedTime(int pos, Optional<String> dateFormatOptional, + long nowTruncSubSec) + throws AnalysisException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index 7a9162c874c..75870de63f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -18,12 +18,15 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.RangeUtils; +import org.apache.doris.mtmv.MTMVUtil; import com.google.common.collect.Range; import java.io.DataOutput; import java.io.IOException; +import java.util.Optional; public class RangePartitionItem extends PartitionItem { private Range<PartitionKey> partitionKeyRange; @@ -60,6 +63,21 @@ public class RangePartitionItem extends PartitionItem { return toPartitionKeyDesc(); } + @Override + public boolean isGreaterThanSpecifiedTime(int pos, Optional<String> dateFormatOptional, long nowTruncSubSec) + throws AnalysisException { + PartitionKey partitionKey = partitionKeyRange.upperEndpoint(); + if (partitionKey.getKeys().size() <= pos) { + throw new AnalysisException( + String.format("toPartitionKeyDesc IndexOutOfBounds, partitionKey: %s, pos: %d", + partitionKey.toString(), + pos)); + } + // If the upper limit of the partition range meets the requirements, this partition needs to be retained + return !isDefaultPartition() && MTMVUtil.getExprTimeSec(partitionKey.getKeys().get(pos), dateFormatOptional) + >= nowTruncSubSec; + } + @Override public void write(DataOutput out) throws IOException { RangeUtils.writeRange(out, partitionKeyRange); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 42d62046948..a09694ada18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -165,6 +165,9 @@ public class PropertyAnalyzer { public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables"; public static final String PROPERTIES_REFRESH_PARTITION_NUM = "refresh_partition_num"; public static final String PROPERTIES_WORKLOAD_GROUP = "workload_group"; + public static final String PROPERTIES_PARTITION_SYNC_LIMIT = "partition_sync_limit"; + public static final String PROPERTIES_PARTITION_TIME_UNIT = "partition_sync_time_unit"; + public static final String PROPERTIES_PARTITION_DATE_FORMAT = "partition_date_format"; // For unique key data model, the feature Merge-on-Write will leverage a primary // key index and a delete-bitmap to mark duplicate keys as deleted in load stage, // which can avoid the merging cost in read stage, and accelerate the aggregation diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncConfig.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncConfig.java new file mode 100644 index 00000000000..b85978e4f46 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncConfig.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import java.util.Optional; + +public class MTMVPartitionSyncConfig { + private int syncLimit; + private MTMVPartitionSyncTimeUnit timeUnit; + private Optional<String> dateFormat; + + public MTMVPartitionSyncConfig(int syncLimit, MTMVPartitionSyncTimeUnit timeUnit, + Optional<String> dateFormat) { + this.syncLimit = syncLimit; + this.timeUnit = timeUnit; + this.dateFormat = dateFormat; + } + + public int getSyncLimit() { + return syncLimit; + } + + public void setSyncLimit(int syncLimit) { + this.syncLimit = syncLimit; + } + + public MTMVPartitionSyncTimeUnit getTimeUnit() { + return timeUnit; + } + + public void setTimeUnit(MTMVPartitionSyncTimeUnit timeUnit) { + this.timeUnit = timeUnit; + } + + public Optional<String> getDateFormat() { + return dateFormat; + } + + public void setDateFormat(Optional<String> dateFormat) { + this.dateFormat = dateFormat; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncTimeUnit.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncTimeUnit.java new file mode 100644 index 00000000000..0d508b9bb22 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncTimeUnit.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import java.util.Optional; + +public enum MTMVPartitionSyncTimeUnit { + YEAR, + MONTH, + DAY; + + public static Optional<MTMVPartitionSyncTimeUnit> fromString(String unit) { + for (MTMVPartitionSyncTimeUnit u : MTMVPartitionSyncTimeUnit.values()) { + if (u.name().equalsIgnoreCase(unit)) { + return Optional.of(u); + } + } + return Optional.empty(); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 9c85725b5bb..b80c5fc283f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -122,10 +122,12 @@ public class MTMVPartitionUtil { * @throws AnalysisException */ public static List<AllPartitionDesc> getPartitionDescsByRelatedTable(MTMVRelatedTableIf relatedTable, - Map<String, String> tableProperties, String relatedCol) throws AnalysisException { + Map<String, String> tableProperties, String relatedCol, Map<String, String> mvProperties) + throws AnalysisException { HashMap<String, String> partitionProperties = Maps.newHashMap(); List<AllPartitionDesc> res = Lists.newArrayList(); - Set<PartitionKeyDesc> relatedPartitionDescs = getRelatedPartitionDescs(relatedTable, relatedCol); + Set<PartitionKeyDesc> relatedPartitionDescs = getRelatedPartitionDescs(relatedTable, relatedCol, + MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties)); for (PartitionKeyDesc partitionKeyDesc : relatedPartitionDescs) { SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true, generatePartitionName(partitionKeyDesc), @@ -137,11 +139,12 @@ public class MTMVPartitionUtil { return res; } - private static Set<PartitionKeyDesc> getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol) + private static Set<PartitionKeyDesc> getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol, + MTMVPartitionSyncConfig config) throws AnalysisException { int pos = getPos(relatedTable, relatedCol); Set<PartitionKeyDesc> res = Sets.newHashSet(); - for (Entry<Long, PartitionItem> entry : relatedTable.getAndCopyPartitionItems().entrySet()) { + for (Entry<Long, PartitionItem> entry : relatedTable.getPartitionItemsByTimeFilter(pos, config).entrySet()) { PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos); res.add(partitionKeyDesc); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPropertyUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPropertyUtil.java new file mode 100644 index 00000000000..a9df9b87d72 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPropertyUtil.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; + +import java.util.Optional; +import java.util.Set; + +public class MTMVPropertyUtil { + public static final Set<String> mvPropertyKeys = Sets.newHashSet( + PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, + PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, + PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM, + PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP, + PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT, + PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT, + PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT + ); + + public static void analyzeProperty(String key, String value) { + switch (key) { + case PropertyAnalyzer.PROPERTIES_GRACE_PERIOD: + analyzeGracePeriod(value); + break; + case PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM: + analyzeRefreshPartitionNum(value); + break; + case PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES: + analyzeExcludedTriggerTables(value); + break; + case PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP: + analyzeWorkloadGroup(value); + break; + case PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT: + analyzePartitionTimeUnit(value); + break; + case PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT: + analyzePartitionDateFormat(value); + break; + case PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT: + analyzePartitionSyncLimit(value); + break; + default: + throw new AnalysisException("illegal key:" + key); + + } + } + + private static void analyzePartitionSyncLimit(String value) { + if (StringUtils.isEmpty(value)) { + return; + } + try { + Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new AnalysisException("valid partition_sync_limit: " + value); + } + } + + private static void analyzePartitionDateFormat(String value) { + // do nothing + } + + private static void analyzePartitionTimeUnit(String value) { + if (StringUtils.isEmpty(value)) { + return; + } + Optional<MTMVPartitionSyncTimeUnit> mtmvPartitionSyncTimeUnit = MTMVPartitionSyncTimeUnit + .fromString(value); + if (!mtmvPartitionSyncTimeUnit.isPresent()) { + throw new AnalysisException("valid partition_sync_time_unit: " + value); + } + } + + private static void analyzeWorkloadGroup(String value) { + if (StringUtils.isEmpty(value)) { + return; + } + if (!StringUtils.isEmpty(value) && !Env.getCurrentEnv().getAccessManager() + .checkWorkloadGroupPriv(ConnectContext.get(), value, PrivPredicate.USAGE)) { + String message = String + .format("Access denied; you need (at least one of) " + + "the %s privilege(s) to use workload group '%s'.", + "USAGE/ADMIN", value); + throw new AnalysisException(message); + } + } + + private static void analyzeExcludedTriggerTables(String value) { + // do nothing + } + + private static void analyzeGracePeriod(String value) { + if (StringUtils.isEmpty(value)) { + return; + } + try { + Long.parseLong(value); + } catch (NumberFormatException e) { + throw new AnalysisException("valid grace_period: " + value); + } + } + + private static void analyzeRefreshPartitionNum(String value) { + if (StringUtils.isEmpty(value)) { + return; + } + try { + Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new AnalysisException("valid refresh_partition_num: " + value); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 5ec7e98a407..1adfe315a8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -24,8 +24,12 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import com.google.common.collect.Maps; + import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; /** @@ -40,6 +44,31 @@ public interface MTMVRelatedTableIf extends TableIf { */ Map<Long, PartitionItem> getAndCopyPartitionItems(); + /** + * Obtain a list of partitions filtered by time + * + * @param pos The position of the partition column to be checked in all partition columns + * @param config + * @return + * @throws AnalysisException + */ + default Map<Long, PartitionItem> getPartitionItemsByTimeFilter(int pos, MTMVPartitionSyncConfig config) + throws AnalysisException { + Map<Long, PartitionItem> partitionItems = getAndCopyPartitionItems(); + if (config.getSyncLimit() <= 0) { + return partitionItems; + } + long nowTruncSubSec = MTMVUtil.getNowTruncSubSec(config.getTimeUnit(), config.getSyncLimit()); + Optional<String> dateFormat = config.getDateFormat(); + Map<Long, PartitionItem> res = Maps.newHashMap(); + for (Entry<Long, PartitionItem> entry : partitionItems.entrySet()) { + if (entry.getValue().isGreaterThanSpecifiedTime(pos, dateFormat, nowTruncSubSec)) { + res.put(entry.getKey(), entry.getValue()); + } + } + return res; + } + /** * getPartitionType LIST/RANGE/UNPARTITIONED * @@ -92,7 +121,7 @@ public interface MTMVRelatedTableIf extends TableIf { * Does the current type of table allow timed triggering * * @return If return false,The method of comparing whether to synchronize will directly return true, - * otherwise the snapshot information will be compared + * otherwise the snapshot information will be compared */ boolean needAutoRefresh(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 3b97e35141d..16459fa1303 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -25,8 +25,21 @@ import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform; +import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; +import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; +import java.util.Optional; import java.util.Set; public class MTMVUtil { @@ -52,7 +65,7 @@ public class MTMVUtil { } /** - * if base tables of mtmv contains external table + * if base tables of mtmv contains external table * * @param mtmv * @return @@ -66,4 +79,116 @@ public class MTMVUtil { } return false; } + + /** + * Obtain the minimum second from `syncLimit` `timeUnit` ago + * + * @param timeUnit + * @param syncLimit + * @return + * @throws AnalysisException + */ + public static long getNowTruncSubSec(MTMVPartitionSyncTimeUnit timeUnit, int syncLimit) + throws AnalysisException { + if (syncLimit < 1) { + throw new AnalysisException("Unexpected syncLimit, syncLimit: " + syncLimit); + } + // get current time + Expression now = DateTimeAcquire.now(); + if (!(now instanceof DateTimeLiteral)) { + throw new AnalysisException("now() should return DateTimeLiteral, now: " + now); + } + DateTimeLiteral nowLiteral = (DateTimeLiteral) now; + // date trunc + now = DateTimeExtractAndTransform + .dateTrunc(nowLiteral, new VarcharLiteral(timeUnit.name())); + if (!(now instanceof DateTimeLiteral)) { + throw new AnalysisException("dateTrunc() should return DateTimeLiteral, now: " + now); + } + nowLiteral = (DateTimeLiteral) now; + // date sub + if (syncLimit > 1) { + nowLiteral = dateSub(nowLiteral, timeUnit, syncLimit - 1); + } + return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp(nowLiteral)).getValue(); + } + + private static DateTimeLiteral dateSub( + org.apache.doris.nereids.trees.expressions.literal.DateLiteral date, MTMVPartitionSyncTimeUnit timeUnit, + int num) + throws AnalysisException { + IntegerLiteral integerLiteral = new IntegerLiteral(num); + Expression result; + switch (timeUnit) { + case DAY: + result = DateTimeArithmetic.dateSub(date, integerLiteral); + break; + case YEAR: + result = DateTimeArithmetic.yearsSub(date, integerLiteral); + break; + case MONTH: + result = DateTimeArithmetic.monthsSub(date, integerLiteral); + break; + default: + throw new AnalysisException("MTMV partition limit not support timeUnit: " + timeUnit.name()); + } + if (!(result instanceof DateTimeLiteral)) { + throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result); + } + return (DateTimeLiteral) result; + } + + /** + * Convert LiteralExpr to second + * + * @param expr + * @param dateFormatOptional + * @return + * @throws AnalysisException + */ + public static long getExprTimeSec(org.apache.doris.analysis.LiteralExpr expr, Optional<String> dateFormatOptional) + throws AnalysisException { + if (expr instanceof org.apache.doris.analysis.MaxLiteral) { + return Long.MAX_VALUE; + } + if (expr instanceof org.apache.doris.analysis.NullLiteral) { + return Long.MIN_VALUE; + } + if (expr instanceof org.apache.doris.analysis.DateLiteral) { + return ((org.apache.doris.analysis.DateLiteral) expr).unixTimestamp(TimeUtils.getTimeZone()) / 1000; + } + if (!dateFormatOptional.isPresent()) { + throw new AnalysisException("expr is not DateLiteral and DateFormat is not present."); + } + String dateFormat = dateFormatOptional.get(); + Expression strToDate = DateTimeExtractAndTransform + .strToDate(new VarcharLiteral(expr.getStringValue()), new VarcharLiteral(dateFormat)); + if (!(strToDate instanceof DateTimeLiteral)) { + throw new AnalysisException( + String.format("strToDate failed, stringValue: %s, dateFormat: %s", expr.getStringValue(), + dateFormat)); + } + return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp((DateTimeLiteral) strToDate)).getValue(); + } + + /** + * Generate MTMVPartitionSyncConfig based on mvProperties + * + * @param mvProperties + * @return + */ + public static MTMVPartitionSyncConfig generateMTMVPartitionSyncConfigByProperties( + Map<String, String> mvProperties) { + int syncLimit = StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)) ? -1 + : Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)); + MTMVPartitionSyncTimeUnit timeUnit = + StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT)) + ? MTMVPartitionSyncTimeUnit.DAY : MTMVPartitionSyncTimeUnit + .valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT).toUpperCase()); + Optional<String> dateFormat = + StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT)) + ? Optional.empty() + : Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT)); + return new MTMVPartitionSyncConfig(syncLimit, timeUnit, dateFormat); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java index b152aadc98d..1e0174bc1f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java @@ -19,13 +19,10 @@ package org.apache.doris.nereids.trees.plans.commands.info; import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.mtmv.MTMVPropertyUtil; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; -import org.apache.commons.lang3.StringUtils; - import java.util.Map; import java.util.Objects; @@ -55,40 +52,8 @@ public class AlterMTMVPropertyInfo extends AlterMTMVInfo { private void analyzeProperties() { for (String key : properties.keySet()) { - if (PropertyAnalyzer.PROPERTIES_GRACE_PERIOD.equals(key)) { - String gracePeriod = properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD); - try { - Long.parseLong(gracePeriod); - } catch (NumberFormatException e) { - throw new org.apache.doris.nereids.exceptions.AnalysisException( - "valid grace_period: " + properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)); - } - } else if (PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM.equals(key)) { - String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM); - try { - Integer.parseInt(refreshPartitionNum); - } catch (NumberFormatException e) { - throw new AnalysisException( - "valid refresh_partition_num: " + properties - .get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); - } - } else if (PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES.equals(key)) { - // nothing - } else if (PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP.equals(key)) { - String workloadGroup = properties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP); - if (!StringUtils.isEmpty(workloadGroup) && !Env.getCurrentEnv().getAccessManager() - .checkWorkloadGroupPriv(ConnectContext.get(), workloadGroup, PrivPredicate.USAGE)) { - String message = String - .format("Access denied; you need (at least one of) " - + "the %s privilege(s) to use workload group '%s'.", - "USAGE/ADMIN", workloadGroup); - throw new AnalysisException(message); - } - } else { - throw new org.apache.doris.nereids.exceptions.AnalysisException("illegal key:" + key); - } + MTMVPropertyUtil.analyzeProperty(key, properties.get(key)); } - } public Map<String, String> getProperties() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index 90c98a44294..d7c502a9eb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -34,13 +34,13 @@ import org.apache.doris.catalog.View; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeNameFormat; -import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.mtmv.EnvInfo; import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mtmv.MTMVPlanUtil; +import org.apache.doris.mtmv.MTMVPropertyUtil; import org.apache.doris.mtmv.MTMVRefreshInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVRelation; @@ -177,46 +177,12 @@ public class CreateMTMVInfo { } private void analyzeProperties() { - if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) { - String gracePeriod = properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD); - try { - Long.parseLong(gracePeriod); - } catch (NumberFormatException e) { - throw new AnalysisException( - "valid grace_period: " + properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)); - } - mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, gracePeriod); - properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD); - } - if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { - String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM); - try { - Integer.parseInt(refreshPartitionNum); - } catch (NumberFormatException e) { - throw new AnalysisException( - "valid refresh_partition_num: " + properties - .get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); - } - mvProperties.put(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM, refreshPartitionNum); - properties.remove(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM); - } - if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { - String excludedTriggerTables = properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES); - mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, excludedTriggerTables); - properties.remove(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES); - } - if (properties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP)) { - String workloadGroup = properties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP); - if (!Env.getCurrentEnv().getAccessManager() - .checkWorkloadGroupPriv(ConnectContext.get(), workloadGroup, PrivPredicate.USAGE)) { - String message = String - .format("Access denied;" - + " you need (at least one of) the %s privilege(s) to use workload group '%s'.", - "USAGE/ADMIN", workloadGroup); - throw new AnalysisException(message); + for (String key : MTMVPropertyUtil.mvPropertyKeys) { + if (properties.containsKey(key)) { + MTMVPropertyUtil.analyzeProperty(key, properties.get(key)); + mvProperties.put(key, properties.get(key)); + properties.remove(key); } - mvProperties.put(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP, workloadGroup); - properties.remove(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP); } } @@ -324,7 +290,8 @@ public class CreateMTMVInfo { List<AllPartitionDesc> allPartitionDescs = null; try { allPartitionDescs = MTMVPartitionUtil - .getPartitionDescsByRelatedTable(relatedTable, properties, mvPartitionInfo.getRelatedCol()); + .getPartitionDescsByRelatedTable(relatedTable, properties, mvPartitionInfo.getRelatedCol(), + mvProperties); } catch (org.apache.doris.common.AnalysisException e) { throw new AnalysisException("getPartitionDescsByRelatedTable failed", e); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java new file mode 100644 index 00000000000..2e6df56cd1a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire; +import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; + +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Optional; + +public class MTMVUtilTest { + @Mocked + private DateTimeAcquire dateTimeAcquire; + + @Test + public void testGenerateMTMVPartitionSyncConfigByProperties() throws AnalysisException { + Map<String, String> mvProperties = Maps.newHashMap(); + MTMVPartitionSyncConfig config = MTMVUtil + .generateMTMVPartitionSyncConfigByProperties(mvProperties); + Assert.assertEquals(-1, config.getSyncLimit()); + Assert.assertFalse(config.getDateFormat().isPresent()); + Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY, config.getTimeUnit()); + + mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT, "1"); + config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties); + Assert.assertEquals(1, config.getSyncLimit()); + Assert.assertFalse(config.getDateFormat().isPresent()); + Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY, config.getTimeUnit()); + + mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT, "month"); + config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties); + Assert.assertEquals(1, config.getSyncLimit()); + Assert.assertFalse(config.getDateFormat().isPresent()); + Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH, config.getTimeUnit()); + + mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT, "%Y%m%d"); + config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties); + Assert.assertEquals(1, config.getSyncLimit()); + Assert.assertEquals("%Y%m%d", config.getDateFormat().get()); + Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH, config.getTimeUnit()); + } + + @Test + public void testGetExprTimeSec() throws AnalysisException { + LiteralExpr expr = new DateLiteral("2020-01-01"); + long exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.empty()); + Assert.assertEquals(1577808000L, exprTimeSec); + expr = new StringLiteral("2020-01-01"); + exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.of("%Y-%m-%d")); + Assert.assertEquals(1577808000L, exprTimeSec); + expr = new IntLiteral(20200101); + exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.of("%Y%m%d")); + Assert.assertEquals(1577808000L, exprTimeSec); + expr = new DateLiteral(Type.DATE, true); + exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.empty()); + Assert.assertEquals(253402185600L, exprTimeSec); + } + + @Test + public void testGetNowTruncSubSec() throws AnalysisException { + DateTimeLiteral dateTimeLiteral = new DateTimeLiteral("2020-02-03 20:10:10"); + new Expectations() { + { + dateTimeAcquire.now(); + minTimes = 0; + result = dateTimeLiteral; + } + }; + long nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 1); + // 2020-02-03 + Assert.assertEquals(1580659200L, nowTruncSubSec); + nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 1); + // 2020-02-01 + Assert.assertEquals(1580486400L, nowTruncSubSec); + nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.YEAR, 1); + // 2020-01-01 + Assert.assertEquals(1577808000L, nowTruncSubSec); + nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 3); + // 2019-12-01 + Assert.assertEquals(1575129600L, nowTruncSubSec); + nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 4); + // 2020-01-31 + Assert.assertEquals(1580400000L, nowTruncSubSec); + } +} diff --git a/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out b/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out new file mode 100644 index 00000000000..3f781e54612 --- /dev/null +++ b/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_base_table -- +1 bj 20380101 +2 sh 20380101 +3 bj 20200101 +4 sh 20200101 + +-- !mtmv_complete -- +1 20380101 bj +2 20380101 sh + diff --git a/regression-test/data/mtmv_p0/test_limit_partition_mtmv.out b/regression-test/data/mtmv_p0/test_limit_partition_mtmv.out new file mode 100644 index 00000000000..a655b5fdeff --- /dev/null +++ b/regression-test/data/mtmv_p0/test_limit_partition_mtmv.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !date_list -- +1 2038-01-01 + +-- !varchar_list -- +1 20380101 + +-- !varchar_list -- +1 20380101 + +-- !date_range -- +1 2038-01-02 + +-- !date_range_all -- +1 2038-01-02 +2 2020-01-02 + diff --git a/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy b/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy new file mode 100644 index 00000000000..f55df2bd924 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_limit_partition_mtmv", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + // prepare data in hive + def hive_database = "test_hive_limit_partition_mtmv_db" + def hive_table = "partition2" + + def drop_table_str = """ drop table if exists ${hive_database}.${hive_table} """ + def drop_database_str = """ drop database if exists ${hive_database}""" + def create_database_str = """ create database ${hive_database}""" + def create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} ( + `k1` int) + PARTITIONED BY ( + `region` string, + `day` string + ) + STORED AS ORC; + """ + def add_partition_str = """ + alter table ${hive_database}.${hive_table} add if not exists + partition(region="bj",day="20380101") + partition(region="sh",day="20380101") + partition(region="bj",day="20200101") + partition(region="sh",day="20200101") + """ + def insert_str1 = """insert into ${hive_database}.${hive_table} PARTITION(region="bj",day="20380101") values(1)""" + def insert_str2 = """insert into ${hive_database}.${hive_table} PARTITION(region="sh",day="20380101") values(2)""" + def insert_str3 = """insert into ${hive_database}.${hive_table} PARTITION(region="bj",day="20200101") values(3)""" + def insert_str4 = """insert into ${hive_database}.${hive_table} PARTITION(region="sh",day="20200101") values(4)""" + + logger.info("hive sql: " + drop_table_str) + hive_docker """ ${drop_table_str} """ + logger.info("hive sql: " + drop_database_str) + hive_docker """ ${drop_database_str} """ + logger.info("hive sql: " + create_database_str) + hive_docker """ ${create_database_str}""" + logger.info("hive sql: " + create_table_str) + hive_docker """ ${create_table_str} """ + logger.info("hive sql: " + add_partition_str) + hive_docker """ ${add_partition_str} """ + logger.info("hive sql: " + insert_str1) + hive_docker """ ${insert_str1} """ + logger.info("hive sql: " + insert_str2) + hive_docker """ ${insert_str2} """ + logger.info("hive sql: " + insert_str3) + hive_docker """ ${insert_str3} """ + logger.info("hive sql: " + insert_str4) + hive_docker """ ${insert_str4} """ + + // prepare catalog + String hms_port = context.config.otherConfigs.get("hms_port") + String catalog_name = "test_hive_limit_partition_mtmv_catalog" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + + order_qt_select_base_table "SELECT * FROM ${catalog_name}.${hive_database}.${hive_table}" + + + // string type + def mvName = "test_hive_limit_partition_mtmv" + def dbName = "regression_test_mtmv_p0" + sql """drop materialized view if exists ${mvName};""" + sql """REFRESH catalog ${catalog_name}""" + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`day`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit'='2', + 'partition_sync_time_unit'='MONTH', + 'partition_date_format'='%Y%m%d' + ) + AS + SELECT k1,day,region FROM ${catalog_name}.${hive_database}.${hive_table}; + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("p_20380101")) + + // refresh complete + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + def jobName = getJobName(dbName, mvName); + waitingMTMVTaskFinished(jobName) + order_qt_mtmv_complete "SELECT * FROM ${mvName} order by k1,day,region" + + + // date type + sql """drop materialized view if exists ${mvName};""" + create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} ( + `k1` int) + PARTITIONED BY ( + `region` string, + `day` date + ) + STORED AS ORC; + """ + add_partition_str = """ + alter table ${hive_database}.${hive_table} add if not exists + partition(region="bj",day="2038-01-01") + partition(region="sh",day="2038-01-01") + partition(region="bj",day="2020-01-01") + partition(region="sh",day="2020-01-01") + """ + logger.info("hive sql: " + drop_table_str) + hive_docker """ ${drop_table_str} """ + logger.info("hive sql: " + create_table_str) + hive_docker """ ${create_table_str} """ + logger.info("hive sql: " + add_partition_str) + hive_docker """ ${add_partition_str} """ + + sql """REFRESH catalog ${catalog_name}""" + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`day`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit'='2', + 'partition_sync_time_unit'='YEAR' + ) + AS + SELECT k1,day,region FROM ${catalog_name}.${hive_database}.${hive_table}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("p_20380101")) + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalog_name}""" +} + diff --git a/regression-test/suites/mtmv_p0/test_limit_partition_mtmv.groovy b/regression-test/suites/mtmv_p0/test_limit_partition_mtmv.groovy new file mode 100644 index 00000000000..fd026a6c02e --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_limit_partition_mtmv.groovy @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert; + +suite("test_limit_partition_mtmv") { + def tableName = "t_test_limit_partition_mtmv_user" + def mvName = "multi_mv_test_limit_partition_mtmv" + def dbName = "regression_test_mtmv_p0" + + // list partition date type + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY list(`k2`) + ( + PARTITION p_20380101 VALUES IN ("2038-01-01"), + PARTITION p_20200101 VALUES IN ("2020-01-01") + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"2038-01-01"),(2,"2020-01-01"); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`k2`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit'='2', + 'partition_sync_time_unit'='YEAR' + ) + AS + SELECT * FROM ${tableName}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("p_20380101")) + + sql """ + REFRESH MATERIALIZED VIEW ${mvName} + """ + def jobName = getJobName(dbName, mvName); + log.info(jobName) + waitingMTMVTaskFinished(jobName) + order_qt_date_list "SELECT * FROM ${mvName} order by k1,k2" + + + + // list partition string type + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` VARCHAR(100) NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY list(`k2`) + ( + PARTITION p_20380101 VALUES IN ("20380101"), + PARTITION p_20200101 VALUES IN ("20200101") + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"20380101"),(2,"20200101"); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`k2`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit'='2', + 'partition_sync_time_unit'='MONTH', + 'partition_date_format'='%Y%m%d' + ) + AS + SELECT * FROM ${tableName}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("p_20380101")) + + sql """ + REFRESH MATERIALIZED VIEW ${mvName} + """ + jobName = getJobName(dbName, mvName); + log.info(jobName) + waitingMTMVTaskFinished(jobName) + order_qt_varchar_list "SELECT * FROM ${mvName} order by k1,k2" + + + // list partition int type + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` int NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY list(`k2`) + ( + PARTITION p_20380101 VALUES IN ("20380101"), + PARTITION p_20200101 VALUES IN ("20200101") + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,20380101),(2,20200101); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`k2`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit'='2', + 'partition_sync_time_unit'='DAY', + 'partition_date_format'='%Y%m%d' + ) + AS + SELECT * FROM ${tableName}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("p_20380101")) + + sql """ + REFRESH MATERIALIZED VIEW ${mvName} + """ + jobName = getJobName(dbName, mvName); + log.info(jobName) + waitingMTMVTaskFinished(jobName) + order_qt_varchar_list "SELECT * FROM ${mvName} order by k1,k2" + + + // range partition date type + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY range(`k2`) + ( + PARTITION p2038 VALUES [("2038-01-01"),("2038-01-03")), + PARTITION p2020 VALUES [("2020-01-01"),("2020-01-03")) + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"2038-01-02"),(2,"2020-01-02"); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`k2`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit'='2', + 'partition_sync_time_unit'='YEAR' + ) + AS + SELECT * FROM ${tableName}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("p_20380101_20380103")) + + sql """ + REFRESH MATERIALIZED VIEW ${mvName} + """ + jobName = getJobName(dbName, mvName); + log.info(jobName) + waitingMTMVTaskFinished(jobName) + order_qt_date_range "SELECT * FROM ${mvName} order by k1,k2" + + + // alter + sql """ + alter Materialized View ${mvName} set("partition_sync_limit"=""); + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvName} + """ + jobName = getJobName(dbName, mvName); + log.info(jobName) + waitingMTMVTaskFinished(jobName) + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(2, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("p_20380101_20380103")) + assertTrue(showPartitionsResult.toString().contains("p_20200101_20200103")) + order_qt_date_range_all "SELECT * FROM ${mvName} order by k1,k2" +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
