This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 379b5414943c7c380c784c04929cad9cbcbf441d Author: zhangdong <[email protected]> AuthorDate: Thu Feb 1 14:42:47 2024 +0800 [enhance](mtmv)use version instead of timestamp (#30599) MTMV records snapshot information for each refresh of data, used to compare whether partitions need to be updated --- .../main/java/org/apache/doris/alter/Alter.java | 2 +- .../main/java/org/apache/doris/catalog/Env.java | 5 +- .../main/java/org/apache/doris/catalog/MTMV.java | 16 +- .../java/org/apache/doris/catalog/OlapTable.java | 30 ++-- .../doris/catalog/external/HMSExternalTable.java | 91 +++++++++--- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 23 ++- .../doris/mtmv/MTMVMaxTimestampSnapshot.java | 59 ++++++++ .../doris/mtmv/MTMVRefreshPartitionSnapshot.java | 43 ++++++ .../org/apache/doris/mtmv/MTMVRefreshSnapshot.java | 75 ++++++++++ .../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 38 +++-- .../java/org/apache/doris/mtmv/MTMVSnapshotIf.java | 24 +++ .../apache/doris/mtmv/MTMVTimestampSnapshot.java | 51 +++++++ .../main/java/org/apache/doris/mtmv/MTMVUtil.java | 164 +++++++++++++++------ .../org/apache/doris/mtmv/MTMVVersionSnapshot.java | 47 ++++++ .../java/org/apache/doris/persist/AlterMTMV.java | 12 ++ .../org/apache/doris/persist/gson/GsonUtils.java | 11 ++ 16 files changed, 591 insertions(+), 100 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 11ef9d90156..d406eb795ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -897,7 +897,7 @@ public class Alter { mtmv.alterMvProperties(alterMTMV.getMvProperties()); break; case ADD_TASK: - mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation()); + mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots()); Env.getCurrentEnv().getMtmvService() .refreshComplete(mtmv, alterMTMV.getRelation(), alterMTMV.getTask()); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 8c9c6ad0523..5c9c3167357 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -178,6 +178,7 @@ import org.apache.doris.master.PartitionInMemoryInfoCollector; import org.apache.doris.meta.MetaContext; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mtmv.MTMVAlterOpType; +import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVService; import org.apache.doris.mtmv.MTMVStatus; @@ -5988,10 +5989,12 @@ public class Env { this.alter.processAlterMTMV(alter, false); } - public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task, MTMVRelation relation) { + public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task, MTMVRelation relation, + Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) { AlterMTMV alter = new AlterMTMV(mvName, MTMVAlterOpType.ADD_TASK); alter.setTask(task); alter.setRelation(relation); + alter.setPartitionSnapshots(partitionSnapshots); this.alter.processAlterMTMV(alter, false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 9cd5bdac78a..f0730f0d371 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -32,6 +32,8 @@ import org.apache.doris.mtmv.MTMVPlanUtil; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; import org.apache.doris.mtmv.MTMVRefreshInfo; +import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; +import org.apache.doris.mtmv.MTMVRefreshSnapshot; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVStatus; import org.apache.doris.persist.gson.GsonUtils; @@ -69,6 +71,8 @@ public class MTMV extends OlapTable { private MTMVRelation relation; @SerializedName("mpi") private MTMVPartitionInfo mvPartitionInfo; + @SerializedName("rs") + private MTMVRefreshSnapshot refreshSnapshot; // Should update after every fresh, not persist private MTMVCache cache; @@ -96,6 +100,7 @@ public class MTMV extends OlapTable { this.mvProperties = params.mvProperties; this.mvPartitionInfo = params.mvPartitionInfo; this.relation = params.relation; + this.refreshSnapshot = new MTMVRefreshSnapshot(); mvRwLock = new ReentrantReadWriteLock(true); } @@ -145,7 +150,8 @@ public class MTMV extends OlapTable { } } - public void addTaskResult(MTMVTask task, MTMVRelation relation) { + public void addTaskResult(MTMVTask task, MTMVRelation relation, + Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) { try { writeMvLock(); if (task.getStatus() == TaskStatus.SUCCESS) { @@ -165,6 +171,7 @@ public class MTMV extends OlapTable { this.status.setRefreshState(MTMVRefreshState.FAIL); } this.jobInfo.addHistoryTask(task); + this.refreshSnapshot.updateSnapshots(partitionSnapshots, getPartitionNames()); } finally { writeMvUnlock(); } @@ -177,7 +184,7 @@ public class MTMV extends OlapTable { public long getGracePeriod() { if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) { - return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)); + return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000; } else { return 0L; } @@ -222,6 +229,10 @@ public class MTMV extends OlapTable { return mvPartitionInfo; } + public MTMVRefreshSnapshot getRefreshSnapshot() { + return refreshSnapshot; + } + public void readMvLock() { this.mvRwLock.readLock().lock(); } @@ -256,6 +267,7 @@ public class MTMV extends OlapTable { mvProperties = materializedView.mvProperties; relation = materializedView.relation; mvPartitionInfo = materializedView.mvPartitionInfo; + refreshSnapshot = materializedView.refreshSnapshot; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index e9e4bbeab7f..ee47d7616e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -47,6 +47,8 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.resource.Tag; @@ -2535,25 +2537,25 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override - public long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException { - return getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(); + public List<Column> getPartitionColumns() { + return getPartitionInfo().getPartitionColumns(); } @Override - public long getLastModifyTime() { - long result = 0L; - long visibleVersionTime; - for (Partition partition : getAllPartitions()) { - visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit(); - if (visibleVersionTime > result) { - result = visibleVersionTime; - } - } - return result; + public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException { + long visibleVersion = getPartitionOrAnalysisException(partitionId).getVisibleVersion(); + return new MTMVVersionSnapshot(visibleVersion); } @Override - public List<Column> getPartitionColumns() { - return getPartitionInfo().getPartitionColumns(); + public MTMVSnapshotIf getTableSnapshot() { + long visibleVersion = getVisibleVersion(); + return new MTMVVersionSnapshot(visibleVersion); + } + + @Override + public String getPartitionName(long partitionId) throws AnalysisException { + return getPartitionOrAnalysisException(partitionId).getName(); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 96efdb8de13..aa4258baadb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -33,7 +33,10 @@ import org.apache.doris.datasource.hive.HMSCachedClient; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; @@ -452,6 +455,15 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return initSchema(); } + public long getLastDdlTime() { + makeSureInitialized(); + Map<String, String> parameters = remoteTable.getParameters(); + if (parameters == null || !parameters.containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) { + return 0L; + } + return Long.parseLong(parameters.get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000; + } + @Override public List<Column> initSchema() { makeSureInitialized(); @@ -561,7 +573,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI public boolean hasColumnStatistics(String colName) { Map<String, String> parameters = remoteTable.getParameters(); return parameters.keySet().stream() - .filter(k -> k.startsWith(SPARK_COL_STATS + colName + ".")).findAny().isPresent(); + .filter(k -> k.startsWith(SPARK_COL_STATS + colName + ".")).findAny().isPresent(); } public boolean fillColumnStatistics(String colName, Map<StatsType, String> statsTypes, Map<String, String> stats) { @@ -772,13 +784,13 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI @Override public boolean isDistributionColumn(String columnName) { return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()).contains(columnName.toLowerCase()); + .collect(Collectors.toSet()).contains(columnName.toLowerCase()); } @Override public Set<String> getDistributionColumnNames() { return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()); + .collect(Collectors.toSet()); } @Override @@ -805,32 +817,73 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException { - List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(1); - partitionValuesList.add( - ((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive()); + public String getPartitionName(long partitionId) throws AnalysisException { + Map<String, Long> partitionNameToIdMap = getHivePartitionValues().getPartitionNameToIdMap(); + for (Entry<String, Long> entry : partitionNameToIdMap.entrySet()) { + if (entry.getValue().equals(partitionId)) { + return entry.getKey(); + } + } + throw new AnalysisException("can not find partition, partitionId: " + partitionId); + } + + private HiveMetaStoreCache.HivePartitionValues getHivePartitionValues() { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); - List<HivePartition> resPartitions = cache.getAllPartitionsWithCache(getDbName(), getName(), - partitionValuesList); - if (resPartitions.size() != 1) { - throw new AnalysisException("partition not normal, size: " + resPartitions.size()); - } - return resPartitions.get(0).getLastModifiedTimeIgnoreInit(); + return cache.getPartitionValues( + getDbName(), getName(), getPartitionColumnTypes()); } @Override - public long getLastModifyTime() throws AnalysisException { + public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException { + long partitionLastModifyTime = getPartitionLastModifyTime(partitionId); + return new MTMVTimestampSnapshot(partitionLastModifyTime); + } - long result = 0L; + @Override + public MTMVSnapshotIf getTableSnapshot() throws AnalysisException { + if (getPartitionType() == PartitionType.UNPARTITIONED) { + return new MTMVMaxTimestampSnapshot(-1L, getLastDdlTime()); + } + long partitionId = 0L; + long maxVersionTime = 0L; long visibleVersionTime; for (Entry<Long, PartitionItem> entry : getPartitionItems().entrySet()) { - visibleVersionTime = getPartitionLastModifyTime(entry.getKey(), entry.getValue()); - if (visibleVersionTime > result) { - result = visibleVersionTime; + visibleVersionTime = getPartitionLastModifyTime(entry.getKey()); + if (visibleVersionTime > maxVersionTime) { + maxVersionTime = visibleVersionTime; + partitionId = entry.getKey(); } } - return result; + return new MTMVMaxTimestampSnapshot(partitionId, maxVersionTime); + } + + private long getPartitionLastModifyTime(long partitionId) throws AnalysisException { + return getPartitionById(partitionId).getLastModifiedTime(); + } + + private HivePartition getPartitionById(long partitionId) throws AnalysisException { + PartitionItem item = getPartitionItems().get(partitionId); + List<List<String>> partitionValuesList = transferPartitionItemToPartitionValues(item); + List<HivePartition> partitions = getPartitionsByPartitionValues(partitionValuesList); + if (partitions.size() != 1) { + throw new AnalysisException("partition not normal, size: " + partitions.size()); + } + return partitions.get(0); + } + + private List<HivePartition> getPartitionsByPartitionValues(List<List<String>> partitionValuesList) { + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) getCatalog()); + return cache.getAllPartitionsWithCache(getDbName(), getName(), + partitionValuesList); + } + + private List<List<String>> transferPartitionItemToPartitionValues(PartitionItem item) { + List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(1); + partitionValuesList.add( + ((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive()); + return partitionValuesList; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 9b5added1b9..194172a6732 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -38,6 +38,7 @@ import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVPlanUtil; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; +import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -133,6 +134,7 @@ public class MTMVTask extends AbstractTask { private MTMV mtmv; private MTMVRelation relation; private StmtExecutor executor; + private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots; public MTMVTask() { } @@ -155,8 +157,9 @@ public class MTMVTask extends AbstractTask { // Every time a task is run, the relation is regenerated because baseTables and baseViews may change, // such as deleting a table and creating a view with the same name this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); - // Before obtaining information from hmsTable, refresh to ensure that the data is up-to-date - refreshHmsTable(); + // Now, the MTMV first ensures consistency with the data in the cache. + // To be completely consistent with hive, you need to manually refresh the cache + // refreshHmsTable(); if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); } @@ -171,6 +174,7 @@ public class MTMVTask extends AbstractTask { int refreshPartitionNum = mtmv.getRefreshPartitionNum(); long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size() % refreshPartitionNum) > 0 ? 1 : 0); + this.partitionSnapshots = Maps.newHashMap(); for (int i = 0; i < execNum; i++) { int start = i * refreshPartitionNum; int end = start + refreshPartitionNum; @@ -178,8 +182,11 @@ public class MTMVTask extends AbstractTask { .subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end)); // need get names before exec List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds); + Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVUtil + .generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds); exec(ctx, execPartitionIds, tableWithPartKey); completedPartitions.addAll(execPartitionNames); + partitionSnapshots.putAll(execPartitionSnapshots); } } catch (Throwable e) { LOG.warn("run task failed: ", e); @@ -241,6 +248,12 @@ public class MTMVTask extends AbstractTask { } } + /** + * // Before obtaining information from hmsTable, refresh to ensure that the data is up-to-date + * + * @throws AnalysisException + * @throws DdlException + */ private void refreshHmsTable() throws AnalysisException, DdlException { for (BaseTableInfo tableInfo : relation.getBaseTables()) { TableIf tableIf = MTMVUtil.getTable(tableInfo); @@ -345,11 +358,13 @@ public class MTMVTask extends AbstractTask { private void after() { if (mtmv != null) { Env.getCurrentEnv() - .addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation); + .addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation, + partitionSnapshots); } mtmv = null; relation = null; executor = null; + partitionSnapshots = null; } private Map<TableIf, String> getIncrementalTableMap() throws AnalysisException { @@ -384,7 +399,7 @@ public class MTMVTask extends AbstractTask { // check if data is fresh // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables(), 0L); + boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables()); if (fresh) { return Lists.newArrayList(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java new file mode 100644 index 00000000000..5b551cebef6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import com.google.common.base.Objects; +import com.google.gson.annotations.SerializedName; + +/** + * The version cannot be obtained from the hive table, + * so the update time is used instead of the version + */ +public class MTMVMaxTimestampSnapshot implements MTMVSnapshotIf { + // partitionId corresponding to timestamp + // The reason why both timestamp and partitionId are stored is to avoid + // deleting the partition corresponding to timestamp + @SerializedName("p") + private long partitionId; + // The maximum modify time in all partitions + @SerializedName("t") + private long timestamp; + + public MTMVMaxTimestampSnapshot(long partitionId, long timestamp) { + this.partitionId = partitionId; + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MTMVMaxTimestampSnapshot that = (MTMVMaxTimestampSnapshot) o; + return partitionId == that.partitionId + && timestamp == that.timestamp; + } + + @Override + public int hashCode() { + return Objects.hashCode(partitionId, timestamp); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java new file mode 100644 index 00000000000..8de2b4cdfed --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.util.Map; + +public class MTMVRefreshPartitionSnapshot { + @SerializedName("p") + private Map<String, MTMVSnapshotIf> partitions; + @SerializedName("t") + private Map<Long, MTMVSnapshotIf> tables; + + public MTMVRefreshPartitionSnapshot() { + this.partitions = Maps.newConcurrentMap(); + this.tables = Maps.newConcurrentMap(); + } + + public Map<String, MTMVSnapshotIf> getPartitions() { + return partitions; + } + + public Map<Long, MTMVSnapshotIf> getTables() { + return tables; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java new file mode 100644 index 00000000000..5132f06a12e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections.MapUtils; + +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +public class MTMVRefreshSnapshot { + @SerializedName("ps") + private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots; + + public MTMVRefreshSnapshot() { + this.partitionSnapshots = Maps.newConcurrentMap(); + } + + public boolean equalsWithRelatedPartition(String mtmvPartitionName, String relatedPartitionName, + MTMVSnapshotIf relatedPartitionCurrentSnapshot) { + MTMVRefreshPartitionSnapshot partitionSnapshot = partitionSnapshots.get(mtmvPartitionName); + if (partitionSnapshot == null) { + return false; + } + MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getPartitions().get(relatedPartitionName); + if (relatedPartitionSnapshot == null) { + return false; + } + return relatedPartitionSnapshot.equals(relatedPartitionCurrentSnapshot); + } + + public boolean equalsWithBaseTable(String mtmvPartitionName, long baseTableId, + MTMVSnapshotIf baseTableCurrentSnapshot) { + MTMVRefreshPartitionSnapshot partitionSnapshot = partitionSnapshots.get(mtmvPartitionName); + if (partitionSnapshot == null) { + return false; + } + MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getTables().get(baseTableId); + if (relatedPartitionSnapshot == null) { + return false; + } + return relatedPartitionSnapshot.equals(baseTableCurrentSnapshot); + } + + public void updateSnapshots(Map<String, MTMVRefreshPartitionSnapshot> addPartitionSnapshots, + Set<String> mvPartitionNames) { + if (!MapUtils.isEmpty(addPartitionSnapshots)) { + this.partitionSnapshots.putAll(addPartitionSnapshots); + } + Iterator<String> iterator = partitionSnapshots.keySet().iterator(); + while (iterator.hasNext()) { + String partitionName = iterator.next(); + if (!mvPartitionNames.contains(partitionName)) { + iterator.remove(); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index d4a9cf3aca7..51773db0df1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -40,16 +40,6 @@ public interface MTMVRelatedTableIf extends TableIf { */ Map<Long, PartitionItem> getPartitionItems(); - /** - * Obtain the latest update time of partition data - * - * @param partitionId - * @param item - * @return millisecond - * @throws AnalysisException - */ - long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException; - /** * getPartitionType LIST/RANGE/UNPARTITIONED * @@ -66,17 +56,35 @@ public interface MTMVRelatedTableIf extends TableIf { Set<String> getPartitionColumnNames() throws DdlException; /** - * Obtain the latest update time of table data + * getPartitionColumns * * @return + */ + List<Column> getPartitionColumns(); + + /** + * getPartitionSnapshot + * + * @param partitionId + * @return partition snapshot at current time * @throws AnalysisException */ - long getLastModifyTime() throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException; /** - * getPartitionColumns + * getTableSnapshot * - * @return + * @return table snapshot at current time + * @throws AnalysisException */ - List<Column> getPartitionColumns(); + MTMVSnapshotIf getTableSnapshot() throws AnalysisException; + + /** + * getPartitionName + * + * @param partitionId + * @return partitionName + * @throws AnalysisException + */ + String getPartitionName(long partitionId) throws AnalysisException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java new file mode 100644 index 00000000000..9a15ab7ef90 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +/** + * MTMV refresh snapshot + */ +public interface MTMVSnapshotIf { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java new file mode 100644 index 00000000000..b3fd88a94de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import com.google.common.base.Objects; +import com.google.gson.annotations.SerializedName; + +/** + * The version cannot be obtained from the hive table, + * so the update time is used instead of the version + */ +public class MTMVTimestampSnapshot implements MTMVSnapshotIf { + @SerializedName("t") + private long timestamp; + + public MTMVTimestampSnapshot(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MTMVTimestampSnapshot that = (MTMVTimestampSnapshot) o; + return timestamp == that.timestamp; + } + + @Override + public int hashCode() { + return Objects.hashCode(timestamp); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 481670d9448..74593e5def5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -73,12 +73,11 @@ public class MTMVUtil { * @param partitionId * @param tables * @param excludedTriggerTables - * @param gracePeriod * @return * @throws AnalysisException */ private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables, - Set<String> excludedTriggerTables, Long gracePeriod) throws AnalysisException { + Set<String> excludedTriggerTables) throws AnalysisException { boolean isSyncWithPartition = true; if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); @@ -92,12 +91,9 @@ public class MTMVUtil { LOG.warn("can not found related partition: " + partitionId); return false; } - isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item, relatedTable, relatedPartitionId, - relatedPartitionItems.get(relatedPartitionId)); + isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId); } - return isSyncWithPartition && isFresherThanTables( - mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(), tables, - excludedTriggerTables, gracePeriod); + return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables); } @@ -158,7 +154,7 @@ public class MTMVUtil { return false; } try { - return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet(), 0L); + return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet()); } catch (AnalysisException e) { LOG.warn("isMTMVSync failed: ", e); return false; @@ -171,16 +167,14 @@ public class MTMVUtil { * @param mtmv * @param tables * @param excludeTables - * @param gracePeriod * @return * @throws AnalysisException */ - public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables, long gracePeriod) + public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables) throws AnalysisException { Collection<Partition> partitions = mtmv.getPartitions(); for (Partition partition : partitions) { - if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables, - gracePeriod)) { + if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) { return false; } } @@ -197,7 +191,6 @@ public class MTMVUtil { */ public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException { List<String> res = Lists.newArrayList(); - long maxAvailableTime = mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(); for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) { TableIf table = getTable(baseTableInfo); if (!(table instanceof MTMVRelatedTableIf)) { @@ -213,14 +206,13 @@ public class MTMVUtil { if (relatedPartitionId == -1L) { throw new AnalysisException("can not found related partition"); } - boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item, mtmvRelatedTableIf, - relatedPartitionId, relatedPartitionItems.get(relatedPartitionId)); + boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf, + relatedPartitionId); if (!isSyncWithPartition) { res.add(mtmvRelatedTableIf.getName()); } } else { - long tableLastVisibleVersionTime = mtmvRelatedTableIf.getLastModifyTime(); - if (tableLastVisibleVersionTime > maxAvailableTime) { + if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) { res.add(table.getName()); } } @@ -257,16 +249,16 @@ public class MTMVUtil { return res; } // check gracePeriod - Long gracePeriod = mtmv.getGracePeriod(); - // do not care data is delayed - if (gracePeriod < 0) { - return allPartitions; - } - + long gracePeriodMills = mtmv.getGracePeriod(); + long currentTimeMills = System.currentTimeMillis(); for (Partition partition : allPartitions) { + if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime() + + gracePeriodMills)) { + res.add(partition); + continue; + } try { - if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet(), - gracePeriod)) { + if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet())) { res.add(partition); } } catch (AnalysisException e) { @@ -290,8 +282,7 @@ public class MTMVUtil { for (Partition partition : allPartitions) { try { if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables, - mtmv.getExcludedTriggerTables(), - 0L)) { + mtmv.getExcludedTriggerTables())) { res.add(partition.getId()); } } catch (AnalysisException e) { @@ -312,11 +303,15 @@ public class MTMVUtil { * @return * @throws AnalysisException */ - private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, PartitionItem mtmvPartitionItem, + private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, MTMVRelatedTableIf relatedTable, - Long relatedPartitionId, PartitionItem relatedPartitionItem) throws AnalysisException { - return mtmv.getPartitionLastModifyTime(mtmvPartitionId, mtmvPartitionItem) >= relatedTable - .getPartitionLastModifyTime(relatedPartitionId, relatedPartitionItem); + Long relatedPartitionId) throws AnalysisException { + MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable + .getPartitionSnapshot(relatedPartitionId); + String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId); + String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); + return mtmv.getRefreshSnapshot() + .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot); } /** @@ -343,9 +338,17 @@ public class MTMVUtil { * @param partitionId */ private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException { - Partition partition = mtmv.getPartitionOrAnalysisException(partitionId); - DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false); - Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause); + if (!mtmv.writeLockIfExist()) { + return; + } + try { + Partition partition = mtmv.getPartitionOrAnalysisException(partitionId); + DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false); + Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause); + } finally { + mtmv.writeUnlock(); + } + } /** @@ -388,15 +391,13 @@ public class MTMVUtil { /** * Determine is sync, ignoring excludedTriggerTables and non OlapTanle * - * @param visibleVersionTime + * @param mtmvPartitionId * @param tables * @param excludedTriggerTables - * @param gracePeriod * @return */ - private static boolean isFresherThanTables(long visibleVersionTime, Set<BaseTableInfo> tables, - Set<String> excludedTriggerTables, Long gracePeriod) throws AnalysisException { - long maxAvailableTime = visibleVersionTime + gracePeriod; + private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set<BaseTableInfo> tables, + Set<String> excludedTriggerTables) throws AnalysisException { for (BaseTableInfo baseTableInfo : tables) { TableIf table = null; try { @@ -408,17 +409,36 @@ public class MTMVUtil { if (excludedTriggerTables.contains(table.getName())) { continue; } - if (!(table instanceof MTMVRelatedTableIf)) { - continue; - } - long tableLastVisibleVersionTime = ((MTMVRelatedTableIf) table).getLastModifyTime(); - if (tableLastVisibleVersionTime > maxAvailableTime) { + boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo); + if (!syncWithBaseTable) { return false; } } return true; } + private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo) + throws AnalysisException { + TableIf table = null; + try { + table = getTable(baseTableInfo); + } catch (AnalysisException e) { + LOG.warn("get table failed, {}", baseTableInfo, e); + return false; + } + + if (!(table instanceof MTMVRelatedTableIf)) { + // if not MTMVRelatedTableIf, we can not get snapshot from it, + // Currently, it is believed to be synchronous + return true; + } + MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table; + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); + String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); + return mtmv.getRefreshSnapshot() + .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); + } + private static boolean mtmvContainsExternalTable(MTMV mtmv) { Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables(); for (BaseTableInfo baseTableInfo : baseTables) { @@ -428,4 +448,60 @@ public class MTMVUtil { } return false; } + + public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv, + Set<BaseTableInfo> baseTables, Set<Long> partitionIds) + throws AnalysisException { + Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap(); + for (Long partitionId : partitionIds) { + res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId)); + } + return res; + } + + + private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, + Set<BaseTableInfo> baseTables, Long partitionId) + throws AnalysisException { + MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); + List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions( + mtmv.getPartitionItems().get(partitionId), + relatedTable); + + for (Long relatedPartitionId : relatedPartitionIds) { + MTMVSnapshotIf partitionSnapshot = relatedTable + .getPartitionSnapshot(relatedPartitionId); + refreshPartitionSnapshot.getPartitions() + .put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot); + } + } + for (BaseTableInfo baseTableInfo : baseTables) { + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv + .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { + continue; + } + TableIf table = getTable(baseTableInfo); + if (!(table instanceof MTMVRelatedTableIf)) { + continue; + } + refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot()); + } + return refreshPartitionSnapshot; + } + + private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem, + MTMVRelatedTableIf relatedTable) { + List<Long> res = Lists.newArrayList(); + Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems(); + for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) { + if (mtmvPartitionItem.equals(entry.getValue())) { + res.add(entry.getKey()); + // current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table + return res; + } + } + return res; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java new file mode 100644 index 00000000000..14304e60183 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import com.google.common.base.Objects; +import com.google.gson.annotations.SerializedName; + +public class MTMVVersionSnapshot implements MTMVSnapshotIf { + @SerializedName("v") + private long version; + + public MTMVVersionSnapshot(long version) { + this.version = version; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MTMVVersionSnapshot that = (MTMVVersionSnapshot) o; + return version == that.version; + } + + @Override + public int hashCode() { + return Objects.hashCode(version); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java index d2084ff6fa3..6c974c57f18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java @@ -22,6 +22,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.mtmv.MTMVAlterOpType; import org.apache.doris.mtmv.MTMVRefreshInfo; +import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVStatus; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; @@ -52,6 +53,8 @@ public class AlterMTMV implements Writable { private MTMVTask task; @SerializedName("r") private MTMVRelation relation; + @SerializedName("ps") + private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots; public AlterMTMV(TableNameInfo mvName, MTMVRefreshInfo refreshInfo, MTMVAlterOpType opType) { this.mvName = Objects.requireNonNull(mvName, "require mvName object"); @@ -125,6 +128,15 @@ public class AlterMTMV implements Writable { this.relation = relation; } + public Map<String, MTMVRefreshPartitionSnapshot> getPartitionSnapshots() { + return partitionSnapshots; + } + + public void setPartitionSnapshots( + Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) { + this.partitionSnapshots = partitionSnapshots; + } + @Override public String toString() { return "AlterMTMV{" diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index fb1b9d8b5ff..cb7d82b22ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -92,6 +92,10 @@ import org.apache.doris.load.routineload.AbstractDataSourceProperties; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.canal.CanalSyncJob; +import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVTimestampSnapshot; +import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.policy.Policy; import org.apache.doris.policy.RowPolicy; import org.apache.doris.policy.StoragePolicy; @@ -241,6 +245,12 @@ public class GsonUtils { .registerSubtype(InsertJob.class, InsertJob.class.getSimpleName()) .registerSubtype(MTMVJob.class, MTMVJob.class.getSimpleName()); + private static RuntimeTypeAdapterFactory<MTMVSnapshotIf> mtmvSnapshotTypeAdapterFactory = + RuntimeTypeAdapterFactory.of(MTMVSnapshotIf.class, "clazz") + .registerSubtype(MTMVMaxTimestampSnapshot.class, MTMVMaxTimestampSnapshot.class.getSimpleName()) + .registerSubtype(MTMVTimestampSnapshot.class, MTMVTimestampSnapshot.class.getSimpleName()) + .registerSubtype(MTMVVersionSnapshot.class, MTMVVersionSnapshot.class.getSimpleName()); + private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of( DatabaseIf.class, "clazz") .registerSubtype(ExternalDatabase.class, ExternalDatabase.class.getSimpleName()) @@ -299,6 +309,7 @@ public class GsonUtils { .registerTypeAdapterFactory(hbResponseTypeAdapterFactory) .registerTypeAdapterFactory(rdsTypeAdapterFactory) .registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory) + .registerTypeAdapterFactory(mtmvSnapshotTypeAdapterFactory) .registerTypeAdapterFactory(constraintTypeAdapterFactory) .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
