This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 33dd8221ae3 [enhance](mtmv) MTMV Use partial partition of base table
(#31632)
33dd8221ae3 is described below
commit 33dd8221ae3eab6cb21e70a6814b56d81ee0cd06
Author: zhangdong <[email protected]>
AuthorDate: Thu Mar 7 12:28:24 2024 +0800
[enhance](mtmv) MTMV Use partial partition of base table (#31632)
MTMV add 3 properties:
partition_sync_limit: digit
partition_sync_time_unit: DAY/MONTH/YEAR
partition_sync_date_format: like "%Y-%m-%d"/"%Y%m%d"
For example, the current time is 2020-02-03 20:10:10
- If partition_sync_limit is set to 1 and partition_sync_time_unit is set
to DAY, only partitions with a time greater than or equal to 2020-02-03
00:00:00 will be synchronized to the MTMV
- If partition_sync_limit is set to 1 and partition_sync_time_unit is set
to MONTH, only partitions with a time greater than or equal to 2020-02-01
00:00:00 will be synchronized to the MTMV
- If partition_sync_limit is set to 1 and partition_sync_time_unit is set
to YEAR, only partitions with a time greater than or equal to 2020-01-01
00:00:00 will be synchronized to the MTMV
- If partition_sync_limit is set to 3 and partition_sync_time_unit is set
to MONTH, only partitions with a time greater than or equal to 2019-12-01
00:00:00 will be synchronized to the MTMV
- If partition_sync_limit is set to 4 and partition_sync_time_unit is set
to DAY, only partitions with a time greater than or equal to 2020-01-31
00:00:00 will be synchronized to the MTMV
---
.../apache/doris/catalog/ListPartitionItem.java | 21 ++
.../main/java/org/apache/doris/catalog/MTMV.java | 11 +-
.../org/apache/doris/catalog/PartitionItem.java | 14 ++
.../apache/doris/catalog/RangePartitionItem.java | 18 ++
.../apache/doris/common/util/PropertyAnalyzer.java | 3 +
.../apache/doris/mtmv/MTMVPartitionSyncConfig.java | 57 +++++
.../doris/mtmv/MTMVPartitionSyncTimeUnit.java | 36 ++++
.../org/apache/doris/mtmv/MTMVPartitionUtil.java | 11 +-
.../org/apache/doris/mtmv/MTMVPropertyUtil.java | 138 ++++++++++++
.../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 31 ++-
.../main/java/org/apache/doris/mtmv/MTMVUtil.java | 127 ++++++++++-
.../plans/commands/info/AlterMTMVPropertyInfo.java | 39 +---
.../trees/plans/commands/info/CreateMTMVInfo.java | 49 +----
.../java/org/apache/doris/mtmv/MTMVUtilTest.java | 113 ++++++++++
.../mtmv_p0/test_hive_limit_partition_mtmv.out | 11 +
.../data/mtmv_p0/test_limit_partition_mtmv.out | 17 ++
.../mtmv_p0/test_hive_limit_partition_mtmv.groovy | 162 ++++++++++++++
.../mtmv_p0/test_limit_partition_mtmv.groovy | 240 +++++++++++++++++++++
18 files changed, 1010 insertions(+), 88 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
index d0f06dd7e55..1a4d188a0ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.mtmv.MTMVUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -29,6 +30,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -99,6 +101,25 @@ public class ListPartitionItem extends PartitionItem {
return PartitionKeyDesc.createIn(Lists.newArrayList(res));
}
+ @Override
+ public boolean isGreaterThanSpecifiedTime(int pos, Optional<String>
dateFormatOptional, long nowTruncSubSec)
+ throws AnalysisException {
+ for (PartitionKey partitionKey : partitionKeys) {
+ if (partitionKey.getKeys().size() <= pos) {
+ throw new AnalysisException(
+ String.format("toPartitionKeyDesc IndexOutOfBounds,
partitionKey: %s, pos: %d",
+ partitionKey.toString(),
+ pos));
+ }
+ if (!isDefaultPartition() &&
MTMVUtil.getExprTimeSec(partitionKey.getKeys().get(pos), dateFormatOptional)
+ >= nowTruncSubSec) {
+ // As long as one of the partitionKeys meets the requirements,
this partition needs to be retained
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(partitionKeys.size());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
index 05622bfc0e7..80feb8f12ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
@@ -38,6 +38,7 @@ import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRefreshSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
+import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
@@ -217,7 +218,7 @@ public class MTMV extends OlapTable {
public long getGracePeriod() {
readMvLock();
try {
- if
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
+ if
(!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)))
{
return
Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) *
1000;
} else {
return 0L;
@@ -243,7 +244,7 @@ public class MTMV extends OlapTable {
public int getRefreshPartitionNum() {
readMvLock();
try {
- if
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
+ if
(!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)))
{
int value =
Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM :
value;
} else {
@@ -257,7 +258,7 @@ public class MTMV extends OlapTable {
public Set<String> getExcludedTriggerTables() {
readMvLock();
try {
- if
(!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES))
{
+ if
(StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)))
{
return Sets.newHashSet();
}
String[] split =
mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
@@ -326,8 +327,10 @@ public class MTMV extends OlapTable {
return Maps.newHashMap();
}
Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>();
- Map<Long, PartitionItem> relatedPartitionItems =
mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems();
int relatedColPos = mvPartitionInfo.getRelatedColPos();
+ Map<Long, PartitionItem> relatedPartitionItems =
mvPartitionInfo.getRelatedTable()
+ .getPartitionItemsByTimeFilter(relatedColPos,
+
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties));
for (Entry<Long, PartitionItem> entry :
relatedPartitionItems.entrySet()) {
PartitionKeyDesc partitionKeyDesc =
entry.getValue().toPartitionKeyDesc(relatedColPos);
if (res.containsKey(partitionKeyDesc)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
index 3c29aa2f48b..af1bbc9d0e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.io.Writable;
import java.util.Comparator;
import java.util.Map;
+import java.util.Optional;
public abstract class PartitionItem implements Comparable<PartitionItem>,
Writable {
public static final Comparator<Map.Entry<Long, PartitionItem>>
ITEM_MAP_ENTRY_COMPARATOR =
@@ -46,4 +47,17 @@ public abstract class PartitionItem implements
Comparable<PartitionItem>, Writab
* @throws AnalysisException
*/
public abstract PartitionKeyDesc toPartitionKeyDesc(int pos) throws
AnalysisException;
+
+ /**
+ * Check if the partition meets the time requirements
+ *
+ * @param pos The position of the partition column to be checked in all
partition columns
+ * @param dateFormatOptional Convert other types to date format
+ * @param nowTruncSubSec The time to compare
+ * @return
+ * @throws AnalysisException
+ */
+ public abstract boolean isGreaterThanSpecifiedTime(int pos,
Optional<String> dateFormatOptional,
+ long nowTruncSubSec)
+ throws AnalysisException;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index 7a9162c874c..75870de63f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -18,12 +18,15 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.RangeUtils;
+import org.apache.doris.mtmv.MTMVUtil;
import com.google.common.collect.Range;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Optional;
public class RangePartitionItem extends PartitionItem {
private Range<PartitionKey> partitionKeyRange;
@@ -60,6 +63,21 @@ public class RangePartitionItem extends PartitionItem {
return toPartitionKeyDesc();
}
+ @Override
+ public boolean isGreaterThanSpecifiedTime(int pos, Optional<String>
dateFormatOptional, long nowTruncSubSec)
+ throws AnalysisException {
+ PartitionKey partitionKey = partitionKeyRange.upperEndpoint();
+ if (partitionKey.getKeys().size() <= pos) {
+ throw new AnalysisException(
+ String.format("toPartitionKeyDesc IndexOutOfBounds,
partitionKey: %s, pos: %d",
+ partitionKey.toString(),
+ pos));
+ }
+ // If the upper limit of the partition range meets the requirements,
this partition needs to be retained
+ return !isDefaultPartition() &&
MTMVUtil.getExprTimeSec(partitionKey.getKeys().get(pos), dateFormatOptional)
+ >= nowTruncSubSec;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
RangeUtils.writeRange(out, partitionKeyRange);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 7a47d8ca072..b2a47a45e7e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -172,6 +172,9 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES =
"excluded_trigger_tables";
public static final String PROPERTIES_REFRESH_PARTITION_NUM =
"refresh_partition_num";
public static final String PROPERTIES_WORKLOAD_GROUP = "workload_group";
+ public static final String PROPERTIES_PARTITION_SYNC_LIMIT =
"partition_sync_limit";
+ public static final String PROPERTIES_PARTITION_TIME_UNIT =
"partition_sync_time_unit";
+ public static final String PROPERTIES_PARTITION_DATE_FORMAT =
"partition_date_format";
// For unique key data model, the feature Merge-on-Write will leverage a
primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load
stage,
// which can avoid the merging cost in read stage, and accelerate the
aggregation
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncConfig.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncConfig.java
new file mode 100644
index 00000000000..b85978e4f46
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncConfig.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import java.util.Optional;
+
+public class MTMVPartitionSyncConfig {
+ private int syncLimit;
+ private MTMVPartitionSyncTimeUnit timeUnit;
+ private Optional<String> dateFormat;
+
+ public MTMVPartitionSyncConfig(int syncLimit, MTMVPartitionSyncTimeUnit
timeUnit,
+ Optional<String> dateFormat) {
+ this.syncLimit = syncLimit;
+ this.timeUnit = timeUnit;
+ this.dateFormat = dateFormat;
+ }
+
+ public int getSyncLimit() {
+ return syncLimit;
+ }
+
+ public void setSyncLimit(int syncLimit) {
+ this.syncLimit = syncLimit;
+ }
+
+ public MTMVPartitionSyncTimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ public void setTimeUnit(MTMVPartitionSyncTimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ }
+
+ public Optional<String> getDateFormat() {
+ return dateFormat;
+ }
+
+ public void setDateFormat(Optional<String> dateFormat) {
+ this.dateFormat = dateFormat;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncTimeUnit.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncTimeUnit.java
new file mode 100644
index 00000000000..0d508b9bb22
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionSyncTimeUnit.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import java.util.Optional;
+
+public enum MTMVPartitionSyncTimeUnit {
+ YEAR,
+ MONTH,
+ DAY;
+
+ public static Optional<MTMVPartitionSyncTimeUnit> fromString(String unit) {
+ for (MTMVPartitionSyncTimeUnit u : MTMVPartitionSyncTimeUnit.values())
{
+ if (u.name().equalsIgnoreCase(unit)) {
+ return Optional.of(u);
+ }
+ }
+ return Optional.empty();
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index 9c85725b5bb..b80c5fc283f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -122,10 +122,12 @@ public class MTMVPartitionUtil {
* @throws AnalysisException
*/
public static List<AllPartitionDesc>
getPartitionDescsByRelatedTable(MTMVRelatedTableIf relatedTable,
- Map<String, String> tableProperties, String relatedCol) throws
AnalysisException {
+ Map<String, String> tableProperties, String relatedCol,
Map<String, String> mvProperties)
+ throws AnalysisException {
HashMap<String, String> partitionProperties = Maps.newHashMap();
List<AllPartitionDesc> res = Lists.newArrayList();
- Set<PartitionKeyDesc> relatedPartitionDescs =
getRelatedPartitionDescs(relatedTable, relatedCol);
+ Set<PartitionKeyDesc> relatedPartitionDescs =
getRelatedPartitionDescs(relatedTable, relatedCol,
+
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties));
for (PartitionKeyDesc partitionKeyDesc : relatedPartitionDescs) {
SinglePartitionDesc singlePartitionDesc = new
SinglePartitionDesc(true,
generatePartitionName(partitionKeyDesc),
@@ -137,11 +139,12 @@ public class MTMVPartitionUtil {
return res;
}
- private static Set<PartitionKeyDesc>
getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol)
+ private static Set<PartitionKeyDesc>
getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol,
+ MTMVPartitionSyncConfig config)
throws AnalysisException {
int pos = getPos(relatedTable, relatedCol);
Set<PartitionKeyDesc> res = Sets.newHashSet();
- for (Entry<Long, PartitionItem> entry :
relatedTable.getAndCopyPartitionItems().entrySet()) {
+ for (Entry<Long, PartitionItem> entry :
relatedTable.getPartitionItemsByTimeFilter(pos, config).entrySet()) {
PartitionKeyDesc partitionKeyDesc =
entry.getValue().toPartitionKeyDesc(pos);
res.add(partitionKeyDesc);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPropertyUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPropertyUtil.java
new file mode 100644
index 00000000000..a9df9b87d72
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPropertyUtil.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Optional;
+import java.util.Set;
+
+public class MTMVPropertyUtil {
+ public static final Set<String> mvPropertyKeys = Sets.newHashSet(
+ PropertyAnalyzer.PROPERTIES_GRACE_PERIOD,
+ PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES,
+ PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM,
+ PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP,
+ PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT,
+ PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT,
+ PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT
+ );
+
+ public static void analyzeProperty(String key, String value) {
+ switch (key) {
+ case PropertyAnalyzer.PROPERTIES_GRACE_PERIOD:
+ analyzeGracePeriod(value);
+ break;
+ case PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM:
+ analyzeRefreshPartitionNum(value);
+ break;
+ case PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES:
+ analyzeExcludedTriggerTables(value);
+ break;
+ case PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP:
+ analyzeWorkloadGroup(value);
+ break;
+ case PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT:
+ analyzePartitionTimeUnit(value);
+ break;
+ case PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT:
+ analyzePartitionDateFormat(value);
+ break;
+ case PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT:
+ analyzePartitionSyncLimit(value);
+ break;
+ default:
+ throw new AnalysisException("illegal key:" + key);
+
+ }
+ }
+
+ private static void analyzePartitionSyncLimit(String value) {
+ if (StringUtils.isEmpty(value)) {
+ return;
+ }
+ try {
+ Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("valid partition_sync_limit: " +
value);
+ }
+ }
+
+ private static void analyzePartitionDateFormat(String value) {
+ // do nothing
+ }
+
+ private static void analyzePartitionTimeUnit(String value) {
+ if (StringUtils.isEmpty(value)) {
+ return;
+ }
+ Optional<MTMVPartitionSyncTimeUnit> mtmvPartitionSyncTimeUnit =
MTMVPartitionSyncTimeUnit
+ .fromString(value);
+ if (!mtmvPartitionSyncTimeUnit.isPresent()) {
+ throw new AnalysisException("valid partition_sync_time_unit: " +
value);
+ }
+ }
+
+ private static void analyzeWorkloadGroup(String value) {
+ if (StringUtils.isEmpty(value)) {
+ return;
+ }
+ if (!StringUtils.isEmpty(value) &&
!Env.getCurrentEnv().getAccessManager()
+ .checkWorkloadGroupPriv(ConnectContext.get(), value,
PrivPredicate.USAGE)) {
+ String message = String
+ .format("Access denied; you need (at least one of) "
+ + "the %s privilege(s) to use workload
group '%s'.",
+ "USAGE/ADMIN", value);
+ throw new AnalysisException(message);
+ }
+ }
+
+ private static void analyzeExcludedTriggerTables(String value) {
+ // do nothing
+ }
+
+ private static void analyzeGracePeriod(String value) {
+ if (StringUtils.isEmpty(value)) {
+ return;
+ }
+ try {
+ Long.parseLong(value);
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("valid grace_period: " + value);
+ }
+ }
+
+ private static void analyzeRefreshPartitionNum(String value) {
+ if (StringUtils.isEmpty(value)) {
+ return;
+ }
+ try {
+ Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("valid refresh_partition_num: " +
value);
+ }
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
index 5ec7e98a407..1adfe315a8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
@@ -24,8 +24,12 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
+import com.google.common.collect.Maps;
+
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
/**
@@ -40,6 +44,31 @@ public interface MTMVRelatedTableIf extends TableIf {
*/
Map<Long, PartitionItem> getAndCopyPartitionItems();
+ /**
+ * Obtain a list of partitions filtered by time
+ *
+ * @param pos The position of the partition column to be checked in all
partition columns
+ * @param config
+ * @return
+ * @throws AnalysisException
+ */
+ default Map<Long, PartitionItem> getPartitionItemsByTimeFilter(int pos,
MTMVPartitionSyncConfig config)
+ throws AnalysisException {
+ Map<Long, PartitionItem> partitionItems = getAndCopyPartitionItems();
+ if (config.getSyncLimit() <= 0) {
+ return partitionItems;
+ }
+ long nowTruncSubSec = MTMVUtil.getNowTruncSubSec(config.getTimeUnit(),
config.getSyncLimit());
+ Optional<String> dateFormat = config.getDateFormat();
+ Map<Long, PartitionItem> res = Maps.newHashMap();
+ for (Entry<Long, PartitionItem> entry : partitionItems.entrySet()) {
+ if (entry.getValue().isGreaterThanSpecifiedTime(pos, dateFormat,
nowTruncSubSec)) {
+ res.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return res;
+ }
+
/**
* getPartitionType LIST/RANGE/UNPARTITIONED
*
@@ -92,7 +121,7 @@ public interface MTMVRelatedTableIf extends TableIf {
* Does the current type of table allow timed triggering
*
* @return If return false,The method of comparing whether to synchronize
will directly return true,
- * otherwise the snapshot information will be compared
+ * otherwise the snapshot information will be compared
*/
boolean needAutoRefresh();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
index 3b97e35141d..16459fa1303 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
@@ -25,8 +25,21 @@ import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import
org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire;
+import
org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic;
+import
org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform;
+import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
public class MTMVUtil {
@@ -52,7 +65,7 @@ public class MTMVUtil {
}
/**
- * if base tables of mtmv contains external table
+ * if base tables of mtmv contains external table
*
* @param mtmv
* @return
@@ -66,4 +79,116 @@ public class MTMVUtil {
}
return false;
}
+
+ /**
+ * Obtain the minimum second from `syncLimit` `timeUnit` ago
+ *
+ * @param timeUnit
+ * @param syncLimit
+ * @return
+ * @throws AnalysisException
+ */
+ public static long getNowTruncSubSec(MTMVPartitionSyncTimeUnit timeUnit,
int syncLimit)
+ throws AnalysisException {
+ if (syncLimit < 1) {
+ throw new AnalysisException("Unexpected syncLimit, syncLimit: " +
syncLimit);
+ }
+ // get current time
+ Expression now = DateTimeAcquire.now();
+ if (!(now instanceof DateTimeLiteral)) {
+ throw new AnalysisException("now() should return DateTimeLiteral,
now: " + now);
+ }
+ DateTimeLiteral nowLiteral = (DateTimeLiteral) now;
+ // date trunc
+ now = DateTimeExtractAndTransform
+ .dateTrunc(nowLiteral, new VarcharLiteral(timeUnit.name()));
+ if (!(now instanceof DateTimeLiteral)) {
+ throw new AnalysisException("dateTrunc() should return
DateTimeLiteral, now: " + now);
+ }
+ nowLiteral = (DateTimeLiteral) now;
+ // date sub
+ if (syncLimit > 1) {
+ nowLiteral = dateSub(nowLiteral, timeUnit, syncLimit - 1);
+ }
+ return ((IntegerLiteral)
DateTimeExtractAndTransform.unixTimestamp(nowLiteral)).getValue();
+ }
+
+ private static DateTimeLiteral dateSub(
+ org.apache.doris.nereids.trees.expressions.literal.DateLiteral
date, MTMVPartitionSyncTimeUnit timeUnit,
+ int num)
+ throws AnalysisException {
+ IntegerLiteral integerLiteral = new IntegerLiteral(num);
+ Expression result;
+ switch (timeUnit) {
+ case DAY:
+ result = DateTimeArithmetic.dateSub(date, integerLiteral);
+ break;
+ case YEAR:
+ result = DateTimeArithmetic.yearsSub(date, integerLiteral);
+ break;
+ case MONTH:
+ result = DateTimeArithmetic.monthsSub(date, integerLiteral);
+ break;
+ default:
+ throw new AnalysisException("MTMV partition limit not support
timeUnit: " + timeUnit.name());
+ }
+ if (!(result instanceof DateTimeLiteral)) {
+ throw new AnalysisException("sub() should return DateTimeLiteral,
result: " + result);
+ }
+ return (DateTimeLiteral) result;
+ }
+
+ /**
+ * Convert LiteralExpr to second
+ *
+ * @param expr
+ * @param dateFormatOptional
+ * @return
+ * @throws AnalysisException
+ */
+ public static long getExprTimeSec(org.apache.doris.analysis.LiteralExpr
expr, Optional<String> dateFormatOptional)
+ throws AnalysisException {
+ if (expr instanceof org.apache.doris.analysis.MaxLiteral) {
+ return Long.MAX_VALUE;
+ }
+ if (expr instanceof org.apache.doris.analysis.NullLiteral) {
+ return Long.MIN_VALUE;
+ }
+ if (expr instanceof org.apache.doris.analysis.DateLiteral) {
+ return ((org.apache.doris.analysis.DateLiteral)
expr).unixTimestamp(TimeUtils.getTimeZone()) / 1000;
+ }
+ if (!dateFormatOptional.isPresent()) {
+ throw new AnalysisException("expr is not DateLiteral and
DateFormat is not present.");
+ }
+ String dateFormat = dateFormatOptional.get();
+ Expression strToDate = DateTimeExtractAndTransform
+ .strToDate(new VarcharLiteral(expr.getStringValue()), new
VarcharLiteral(dateFormat));
+ if (!(strToDate instanceof DateTimeLiteral)) {
+ throw new AnalysisException(
+ String.format("strToDate failed, stringValue: %s,
dateFormat: %s", expr.getStringValue(),
+ dateFormat));
+ }
+ return ((IntegerLiteral)
DateTimeExtractAndTransform.unixTimestamp((DateTimeLiteral)
strToDate)).getValue();
+ }
+
+ /**
+ * Generate MTMVPartitionSyncConfig based on mvProperties
+ *
+ * @param mvProperties
+ * @return
+ */
+ public static MTMVPartitionSyncConfig
generateMTMVPartitionSyncConfigByProperties(
+ Map<String, String> mvProperties) {
+ int syncLimit =
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT))
? -1
+ :
Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT));
+ MTMVPartitionSyncTimeUnit timeUnit =
+
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT))
+ ? MTMVPartitionSyncTimeUnit.DAY :
MTMVPartitionSyncTimeUnit
+
.valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT).toUpperCase());
+ Optional<String> dateFormat =
+
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT))
+ ? Optional.empty()
+ :
Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT));
+ return new MTMVPartitionSyncConfig(syncLimit, timeUnit, dateFormat);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java
index b152aadc98d..1e0174bc1f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java
@@ -19,13 +19,10 @@ package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.PropertyAnalyzer;
-import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.mtmv.MTMVPropertyUtil;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.Map;
import java.util.Objects;
@@ -55,40 +52,8 @@ public class AlterMTMVPropertyInfo extends AlterMTMVInfo {
private void analyzeProperties() {
for (String key : properties.keySet()) {
- if (PropertyAnalyzer.PROPERTIES_GRACE_PERIOD.equals(key)) {
- String gracePeriod =
properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
- try {
- Long.parseLong(gracePeriod);
- } catch (NumberFormatException e) {
- throw new
org.apache.doris.nereids.exceptions.AnalysisException(
- "valid grace_period: " +
properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
- }
- } else if
(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM.equals(key)) {
- String refreshPartitionNum =
properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
- try {
- Integer.parseInt(refreshPartitionNum);
- } catch (NumberFormatException e) {
- throw new AnalysisException(
- "valid refresh_partition_num: " + properties
-
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
- }
- } else if
(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES.equals(key)) {
- // nothing
- } else if (PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP.equals(key))
{
- String workloadGroup =
properties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
- if (!StringUtils.isEmpty(workloadGroup) &&
!Env.getCurrentEnv().getAccessManager()
- .checkWorkloadGroupPriv(ConnectContext.get(),
workloadGroup, PrivPredicate.USAGE)) {
- String message = String
- .format("Access denied; you need (at least one of)
"
- + "the %s privilege(s) to use
workload group '%s'.",
- "USAGE/ADMIN", workloadGroup);
- throw new AnalysisException(message);
- }
- } else {
- throw new
org.apache.doris.nereids.exceptions.AnalysisException("illegal key:" + key);
- }
+ MTMVPropertyUtil.analyzeProperty(key, properties.get(key));
}
-
}
public Map<String, String> getProperties() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
index 90c98a44294..d7c502a9eb8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
@@ -34,13 +34,13 @@ import org.apache.doris.catalog.View;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeNameFormat;
-import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
+import org.apache.doris.mtmv.MTMVPropertyUtil;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVRelation;
@@ -177,46 +177,12 @@ public class CreateMTMVInfo {
}
private void analyzeProperties() {
- if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
- String gracePeriod =
properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
- try {
- Long.parseLong(gracePeriod);
- } catch (NumberFormatException e) {
- throw new AnalysisException(
- "valid grace_period: " +
properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
- }
- mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD,
gracePeriod);
- properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
- }
- if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
- String refreshPartitionNum =
properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
- try {
- Integer.parseInt(refreshPartitionNum);
- } catch (NumberFormatException e) {
- throw new AnalysisException(
- "valid refresh_partition_num: " + properties
-
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
- }
-
mvProperties.put(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM,
refreshPartitionNum);
-
properties.remove(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
- }
- if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
- String excludedTriggerTables =
properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
-
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES,
excludedTriggerTables);
-
properties.remove(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
- }
- if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP)) {
- String workloadGroup =
properties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
- if (!Env.getCurrentEnv().getAccessManager()
- .checkWorkloadGroupPriv(ConnectContext.get(),
workloadGroup, PrivPredicate.USAGE)) {
- String message = String
- .format("Access denied;"
- + " you need (at least one of) the %s
privilege(s) to use workload group '%s'.",
- "USAGE/ADMIN", workloadGroup);
- throw new AnalysisException(message);
+ for (String key : MTMVPropertyUtil.mvPropertyKeys) {
+ if (properties.containsKey(key)) {
+ MTMVPropertyUtil.analyzeProperty(key, properties.get(key));
+ mvProperties.put(key, properties.get(key));
+ properties.remove(key);
}
- mvProperties.put(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP,
workloadGroup);
- properties.remove(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
}
}
@@ -324,7 +290,8 @@ public class CreateMTMVInfo {
List<AllPartitionDesc> allPartitionDescs = null;
try {
allPartitionDescs = MTMVPartitionUtil
- .getPartitionDescsByRelatedTable(relatedTable, properties,
mvPartitionInfo.getRelatedCol());
+ .getPartitionDescsByRelatedTable(relatedTable, properties,
mvPartitionInfo.getRelatedCol(),
+ mvProperties);
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException("getPartitionDescsByRelatedTable
failed", e);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java
new file mode 100644
index 00000000000..2e6df56cd1a
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import
org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire;
+import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
+
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Optional;
+
+public class MTMVUtilTest {
+ @Mocked
+ private DateTimeAcquire dateTimeAcquire;
+
+ @Test
+ public void testGenerateMTMVPartitionSyncConfigByProperties() throws
AnalysisException {
+ Map<String, String> mvProperties = Maps.newHashMap();
+ MTMVPartitionSyncConfig config = MTMVUtil
+ .generateMTMVPartitionSyncConfigByProperties(mvProperties);
+ Assert.assertEquals(-1, config.getSyncLimit());
+ Assert.assertFalse(config.getDateFormat().isPresent());
+ Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY,
config.getTimeUnit());
+
+ mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT,
"1");
+ config =
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties);
+ Assert.assertEquals(1, config.getSyncLimit());
+ Assert.assertFalse(config.getDateFormat().isPresent());
+ Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY,
config.getTimeUnit());
+
+ mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT,
"month");
+ config =
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties);
+ Assert.assertEquals(1, config.getSyncLimit());
+ Assert.assertFalse(config.getDateFormat().isPresent());
+ Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH,
config.getTimeUnit());
+
+ mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT,
"%Y%m%d");
+ config =
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties);
+ Assert.assertEquals(1, config.getSyncLimit());
+ Assert.assertEquals("%Y%m%d", config.getDateFormat().get());
+ Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH,
config.getTimeUnit());
+ }
+
+ @Test
+ public void testGetExprTimeSec() throws AnalysisException {
+ LiteralExpr expr = new DateLiteral("2020-01-01");
+ long exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.empty());
+ Assert.assertEquals(1577808000L, exprTimeSec);
+ expr = new StringLiteral("2020-01-01");
+ exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.of("%Y-%m-%d"));
+ Assert.assertEquals(1577808000L, exprTimeSec);
+ expr = new IntLiteral(20200101);
+ exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.of("%Y%m%d"));
+ Assert.assertEquals(1577808000L, exprTimeSec);
+ expr = new DateLiteral(Type.DATE, true);
+ exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.empty());
+ Assert.assertEquals(253402185600L, exprTimeSec);
+ }
+
+ @Test
+ public void testGetNowTruncSubSec() throws AnalysisException {
+ DateTimeLiteral dateTimeLiteral = new DateTimeLiteral("2020-02-03
20:10:10");
+ new Expectations() {
+ {
+ dateTimeAcquire.now();
+ minTimes = 0;
+ result = dateTimeLiteral;
+ }
+ };
+ long nowTruncSubSec =
MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 1);
+ // 2020-02-03
+ Assert.assertEquals(1580659200L, nowTruncSubSec);
+ nowTruncSubSec =
MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 1);
+ // 2020-02-01
+ Assert.assertEquals(1580486400L, nowTruncSubSec);
+ nowTruncSubSec =
MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.YEAR, 1);
+ // 2020-01-01
+ Assert.assertEquals(1577808000L, nowTruncSubSec);
+ nowTruncSubSec =
MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 3);
+ // 2019-12-01
+ Assert.assertEquals(1575129600L, nowTruncSubSec);
+ nowTruncSubSec =
MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 4);
+ // 2020-01-31
+ Assert.assertEquals(1580400000L, nowTruncSubSec);
+ }
+}
diff --git a/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out
b/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out
new file mode 100644
index 00000000000..3f781e54612
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_base_table --
+1 bj 20380101
+2 sh 20380101
+3 bj 20200101
+4 sh 20200101
+
+-- !mtmv_complete --
+1 20380101 bj
+2 20380101 sh
+
diff --git a/regression-test/data/mtmv_p0/test_limit_partition_mtmv.out
b/regression-test/data/mtmv_p0/test_limit_partition_mtmv.out
new file mode 100644
index 00000000000..a655b5fdeff
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_limit_partition_mtmv.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !date_list --
+1 2038-01-01
+
+-- !varchar_list --
+1 20380101
+
+-- !varchar_list --
+1 20380101
+
+-- !date_range --
+1 2038-01-02
+
+-- !date_range_all --
+1 2038-01-02
+2 2020-01-02
+
diff --git
a/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy
new file mode 100644
index 00000000000..f55df2bd924
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_limit_partition_mtmv",
"p0,external,hive,external_docker,external_docker_hive") {
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("diable Hive test.")
+ return;
+ }
+ // prepare data in hive
+ def hive_database = "test_hive_limit_partition_mtmv_db"
+ def hive_table = "partition2"
+
+ def drop_table_str = """ drop table if exists
${hive_database}.${hive_table} """
+ def drop_database_str = """ drop database if exists ${hive_database}"""
+ def create_database_str = """ create database ${hive_database}"""
+ def create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} (
+ `k1` int)
+ PARTITIONED BY (
+ `region` string,
+ `day` string
+ )
+ STORED AS ORC;
+ """
+ def add_partition_str = """
+ alter table ${hive_database}.${hive_table} add
if not exists
+ partition(region="bj",day="20380101")
+ partition(region="sh",day="20380101")
+ partition(region="bj",day="20200101")
+ partition(region="sh",day="20200101")
+ """
+ def insert_str1 = """insert into ${hive_database}.${hive_table}
PARTITION(region="bj",day="20380101") values(1)"""
+ def insert_str2 = """insert into ${hive_database}.${hive_table}
PARTITION(region="sh",day="20380101") values(2)"""
+ def insert_str3 = """insert into ${hive_database}.${hive_table}
PARTITION(region="bj",day="20200101") values(3)"""
+ def insert_str4 = """insert into ${hive_database}.${hive_table}
PARTITION(region="sh",day="20200101") values(4)"""
+
+ logger.info("hive sql: " + drop_table_str)
+ hive_docker """ ${drop_table_str} """
+ logger.info("hive sql: " + drop_database_str)
+ hive_docker """ ${drop_database_str} """
+ logger.info("hive sql: " + create_database_str)
+ hive_docker """ ${create_database_str}"""
+ logger.info("hive sql: " + create_table_str)
+ hive_docker """ ${create_table_str} """
+ logger.info("hive sql: " + add_partition_str)
+ hive_docker """ ${add_partition_str} """
+ logger.info("hive sql: " + insert_str1)
+ hive_docker """ ${insert_str1} """
+ logger.info("hive sql: " + insert_str2)
+ hive_docker """ ${insert_str2} """
+ logger.info("hive sql: " + insert_str3)
+ hive_docker """ ${insert_str3} """
+ logger.info("hive sql: " + insert_str4)
+ hive_docker """ ${insert_str4} """
+
+ // prepare catalog
+ String hms_port = context.config.otherConfigs.get("hms_port")
+ String catalog_name = "test_hive_limit_partition_mtmv_catalog"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type"="hms",
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+ );"""
+
+ order_qt_select_base_table "SELECT * FROM
${catalog_name}.${hive_database}.${hive_table}"
+
+
+ // string type
+ def mvName = "test_hive_limit_partition_mtmv"
+ def dbName = "regression_test_mtmv_p0"
+ sql """drop materialized view if exists ${mvName};"""
+ sql """REFRESH catalog ${catalog_name}"""
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`day`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES (
+ 'replication_num' = '1',
+ 'partition_sync_limit'='2',
+ 'partition_sync_time_unit'='MONTH',
+ 'partition_date_format'='%Y%m%d'
+ )
+ AS
+ SELECT k1,day,region FROM
${catalog_name}.${hive_database}.${hive_table};
+ """
+ def showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(1, showPartitionsResult.size())
+ assertTrue(showPartitionsResult.toString().contains("p_20380101"))
+
+ // refresh complete
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ def jobName = getJobName(dbName, mvName);
+ waitingMTMVTaskFinished(jobName)
+ order_qt_mtmv_complete "SELECT * FROM ${mvName} order by k1,day,region"
+
+
+ // date type
+ sql """drop materialized view if exists ${mvName};"""
+ create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} (
+ `k1` int)
+ PARTITIONED BY (
+ `region` string,
+ `day` date
+ )
+ STORED AS ORC;
+ """
+ add_partition_str = """
+ alter table ${hive_database}.${hive_table} add if
not exists
+ partition(region="bj",day="2038-01-01")
+ partition(region="sh",day="2038-01-01")
+ partition(region="bj",day="2020-01-01")
+ partition(region="sh",day="2020-01-01")
+ """
+ logger.info("hive sql: " + drop_table_str)
+ hive_docker """ ${drop_table_str} """
+ logger.info("hive sql: " + create_table_str)
+ hive_docker """ ${create_table_str} """
+ logger.info("hive sql: " + add_partition_str)
+ hive_docker """ ${add_partition_str} """
+
+ sql """REFRESH catalog ${catalog_name}"""
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`day`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES (
+ 'replication_num' = '1',
+ 'partition_sync_limit'='2',
+ 'partition_sync_time_unit'='YEAR'
+ )
+ AS
+ SELECT k1,day,region FROM
${catalog_name}.${hive_database}.${hive_table};
+ """
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(1, showPartitionsResult.size())
+ assertTrue(showPartitionsResult.toString().contains("p_20380101"))
+ sql """drop materialized view if exists ${mvName};"""
+ sql """drop catalog if exists ${catalog_name}"""
+}
+
diff --git a/regression-test/suites/mtmv_p0/test_limit_partition_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_limit_partition_mtmv.groovy
new file mode 100644
index 00000000000..fd026a6c02e
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_limit_partition_mtmv.groovy
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert;
+
+suite("test_limit_partition_mtmv") {
+ def tableName = "t_test_limit_partition_mtmv_user"
+ def mvName = "multi_mv_test_limit_partition_mtmv"
+ def dbName = "regression_test_mtmv_p0"
+
+ // list partition date type
+ sql """drop table if exists `${tableName}`"""
+ sql """drop materialized view if exists ${mvName};"""
+ sql """
+ CREATE TABLE `${tableName}` (
+ `k1` LARGEINT NOT NULL COMMENT '\"用户id\"',
+ `k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"'
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ PARTITION BY list(`k2`)
+ (
+ PARTITION p_20380101 VALUES IN ("2038-01-01"),
+ PARTITION p_20200101 VALUES IN ("2020-01-01")
+ )
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+ sql """
+ insert into ${tableName} values(1,"2038-01-01"),(2,"2020-01-01");
+ """
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`k2`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES (
+ 'replication_num' = '1',
+ 'partition_sync_limit'='2',
+ 'partition_sync_time_unit'='YEAR'
+ )
+ AS
+ SELECT * FROM ${tableName};
+ """
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(1, showPartitionsResult.size())
+ assertTrue(showPartitionsResult.toString().contains("p_20380101"))
+
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName}
+ """
+ def jobName = getJobName(dbName, mvName);
+ log.info(jobName)
+ waitingMTMVTaskFinished(jobName)
+ order_qt_date_list "SELECT * FROM ${mvName} order by k1,k2"
+
+
+
+ // list partition string type
+ sql """drop table if exists `${tableName}`"""
+ sql """drop materialized view if exists ${mvName};"""
+ sql """
+ CREATE TABLE `${tableName}` (
+ `k1` LARGEINT NOT NULL COMMENT '\"用户id\"',
+ `k2` VARCHAR(100) NOT NULL COMMENT '\"数据灌入日期时间\"'
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ PARTITION BY list(`k2`)
+ (
+ PARTITION p_20380101 VALUES IN ("20380101"),
+ PARTITION p_20200101 VALUES IN ("20200101")
+ )
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+ sql """
+ insert into ${tableName} values(1,"20380101"),(2,"20200101");
+ """
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`k2`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES (
+ 'replication_num' = '1',
+ 'partition_sync_limit'='2',
+ 'partition_sync_time_unit'='MONTH',
+ 'partition_date_format'='%Y%m%d'
+ )
+ AS
+ SELECT * FROM ${tableName};
+ """
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(1, showPartitionsResult.size())
+ assertTrue(showPartitionsResult.toString().contains("p_20380101"))
+
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName}
+ """
+ jobName = getJobName(dbName, mvName);
+ log.info(jobName)
+ waitingMTMVTaskFinished(jobName)
+ order_qt_varchar_list "SELECT * FROM ${mvName} order by k1,k2"
+
+
+ // list partition int type
+ sql """drop table if exists `${tableName}`"""
+ sql """drop materialized view if exists ${mvName};"""
+ sql """
+ CREATE TABLE `${tableName}` (
+ `k1` LARGEINT NOT NULL COMMENT '\"用户id\"',
+ `k2` int NOT NULL COMMENT '\"数据灌入日期时间\"'
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ PARTITION BY list(`k2`)
+ (
+ PARTITION p_20380101 VALUES IN ("20380101"),
+ PARTITION p_20200101 VALUES IN ("20200101")
+ )
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+ sql """
+ insert into ${tableName} values(1,20380101),(2,20200101);
+ """
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`k2`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES (
+ 'replication_num' = '1',
+ 'partition_sync_limit'='2',
+ 'partition_sync_time_unit'='DAY',
+ 'partition_date_format'='%Y%m%d'
+ )
+ AS
+ SELECT * FROM ${tableName};
+ """
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(1, showPartitionsResult.size())
+ assertTrue(showPartitionsResult.toString().contains("p_20380101"))
+
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName}
+ """
+ jobName = getJobName(dbName, mvName);
+ log.info(jobName)
+ waitingMTMVTaskFinished(jobName)
+ order_qt_varchar_list "SELECT * FROM ${mvName} order by k1,k2"
+
+
+ // range partition date type
+ sql """drop table if exists `${tableName}`"""
+ sql """drop materialized view if exists ${mvName};"""
+ sql """
+ CREATE TABLE `${tableName}` (
+ `k1` LARGEINT NOT NULL COMMENT '\"用户id\"',
+ `k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"'
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ PARTITION BY range(`k2`)
+ (
+ PARTITION p2038 VALUES [("2038-01-01"),("2038-01-03")),
+ PARTITION p2020 VALUES [("2020-01-01"),("2020-01-03"))
+ )
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+ sql """
+ insert into ${tableName} values(1,"2038-01-02"),(2,"2020-01-02");
+ """
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`k2`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES (
+ 'replication_num' = '1',
+ 'partition_sync_limit'='2',
+ 'partition_sync_time_unit'='YEAR'
+ )
+ AS
+ SELECT * FROM ${tableName};
+ """
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(1, showPartitionsResult.size())
+ assertTrue(showPartitionsResult.toString().contains("p_20380101_20380103"))
+
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName}
+ """
+ jobName = getJobName(dbName, mvName);
+ log.info(jobName)
+ waitingMTMVTaskFinished(jobName)
+ order_qt_date_range "SELECT * FROM ${mvName} order by k1,k2"
+
+
+ // alter
+ sql """
+ alter Materialized View ${mvName} set("partition_sync_limit"="");
+ """
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName}
+ """
+ jobName = getJobName(dbName, mvName);
+ log.info(jobName)
+ waitingMTMVTaskFinished(jobName)
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(2, showPartitionsResult.size())
+ assertTrue(showPartitionsResult.toString().contains("p_20380101_20380103"))
+ assertTrue(showPartitionsResult.toString().contains("p_20200101_20200103"))
+ order_qt_date_range_all "SELECT * FROM ${mvName} order by k1,k2"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]