This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 379b5414943c7c380c784c04929cad9cbcbf441d
Author: zhangdong <[email protected]>
AuthorDate: Thu Feb 1 14:42:47 2024 +0800

    [enhance](mtmv)use version instead of timestamp (#30599)
    
    MTMV records snapshot information for each refresh of data, used to compare 
whether partitions need to be updated
---
 .../main/java/org/apache/doris/alter/Alter.java    |   2 +-
 .../main/java/org/apache/doris/catalog/Env.java    |   5 +-
 .../main/java/org/apache/doris/catalog/MTMV.java   |  16 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |  30 ++--
 .../doris/catalog/external/HMSExternalTable.java   |  91 +++++++++---
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |  23 ++-
 .../doris/mtmv/MTMVMaxTimestampSnapshot.java       |  59 ++++++++
 .../doris/mtmv/MTMVRefreshPartitionSnapshot.java   |  43 ++++++
 .../org/apache/doris/mtmv/MTMVRefreshSnapshot.java |  75 ++++++++++
 .../org/apache/doris/mtmv/MTMVRelatedTableIf.java  |  38 +++--
 .../java/org/apache/doris/mtmv/MTMVSnapshotIf.java |  24 +++
 .../apache/doris/mtmv/MTMVTimestampSnapshot.java   |  51 +++++++
 .../main/java/org/apache/doris/mtmv/MTMVUtil.java  | 164 +++++++++++++++------
 .../org/apache/doris/mtmv/MTMVVersionSnapshot.java |  47 ++++++
 .../java/org/apache/doris/persist/AlterMTMV.java   |  12 ++
 .../org/apache/doris/persist/gson/GsonUtils.java   |  11 ++
 16 files changed, 591 insertions(+), 100 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 11ef9d90156..d406eb795ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -897,7 +897,7 @@ public class Alter {
                     mtmv.alterMvProperties(alterMTMV.getMvProperties());
                     break;
                 case ADD_TASK:
-                    mtmv.addTaskResult(alterMTMV.getTask(), 
alterMTMV.getRelation());
+                    mtmv.addTaskResult(alterMTMV.getTask(), 
alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots());
                     Env.getCurrentEnv().getMtmvService()
                             .refreshComplete(mtmv, alterMTMV.getRelation(), 
alterMTMV.getTask());
                     break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 8c9c6ad0523..5c9c3167357 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -178,6 +178,7 @@ import 
org.apache.doris.master.PartitionInMemoryInfoCollector;
 import org.apache.doris.meta.MetaContext;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mtmv.MTMVAlterOpType;
+import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
 import org.apache.doris.mtmv.MTMVRelation;
 import org.apache.doris.mtmv.MTMVService;
 import org.apache.doris.mtmv.MTMVStatus;
@@ -5988,10 +5989,12 @@ public class Env {
         this.alter.processAlterMTMV(alter, false);
     }
 
-    public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task, 
MTMVRelation relation) {
+    public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task, 
MTMVRelation relation,
+            Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
         AlterMTMV alter = new AlterMTMV(mvName, MTMVAlterOpType.ADD_TASK);
         alter.setTask(task);
         alter.setRelation(relation);
+        alter.setPartitionSnapshots(partitionSnapshots);
         this.alter.processAlterMTMV(alter, false);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
index 9cd5bdac78a..f0730f0d371 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
@@ -32,6 +32,8 @@ import org.apache.doris.mtmv.MTMVPlanUtil;
 import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
 import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
 import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshSnapshot;
 import org.apache.doris.mtmv.MTMVRelation;
 import org.apache.doris.mtmv.MTMVStatus;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -69,6 +71,8 @@ public class MTMV extends OlapTable {
     private MTMVRelation relation;
     @SerializedName("mpi")
     private MTMVPartitionInfo mvPartitionInfo;
+    @SerializedName("rs")
+    private MTMVRefreshSnapshot refreshSnapshot;
     // Should update after every fresh, not persist
     private MTMVCache cache;
 
@@ -96,6 +100,7 @@ public class MTMV extends OlapTable {
         this.mvProperties = params.mvProperties;
         this.mvPartitionInfo = params.mvPartitionInfo;
         this.relation = params.relation;
+        this.refreshSnapshot = new MTMVRefreshSnapshot();
         mvRwLock = new ReentrantReadWriteLock(true);
     }
 
@@ -145,7 +150,8 @@ public class MTMV extends OlapTable {
         }
     }
 
-    public void addTaskResult(MTMVTask task, MTMVRelation relation) {
+    public void addTaskResult(MTMVTask task, MTMVRelation relation,
+            Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
         try {
             writeMvLock();
             if (task.getStatus() == TaskStatus.SUCCESS) {
@@ -165,6 +171,7 @@ public class MTMV extends OlapTable {
                 this.status.setRefreshState(MTMVRefreshState.FAIL);
             }
             this.jobInfo.addHistoryTask(task);
+            this.refreshSnapshot.updateSnapshots(partitionSnapshots, 
getPartitionNames());
         } finally {
             writeMvUnlock();
         }
@@ -177,7 +184,7 @@ public class MTMV extends OlapTable {
 
     public long getGracePeriod() {
         if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
-            return 
Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
+            return 
Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 
1000;
         } else {
             return 0L;
         }
@@ -222,6 +229,10 @@ public class MTMV extends OlapTable {
         return mvPartitionInfo;
     }
 
+    public MTMVRefreshSnapshot getRefreshSnapshot() {
+        return refreshSnapshot;
+    }
+
     public void readMvLock() {
         this.mvRwLock.readLock().lock();
     }
@@ -256,6 +267,7 @@ public class MTMV extends OlapTable {
         mvProperties = materializedView.mvProperties;
         relation = materializedView.relation;
         mvPartitionInfo = materializedView.mvPartitionInfo;
+        refreshSnapshot = materializedView.refreshSnapshot;
     }
 
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index e9e4bbeab7f..ee47d7616e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -47,6 +47,8 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVVersionSnapshot;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.resource.Tag;
@@ -2535,25 +2537,25 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
     }
 
     @Override
-    public long getPartitionLastModifyTime(long partitionId, PartitionItem 
item) throws AnalysisException {
-        return 
getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
+    public List<Column> getPartitionColumns() {
+        return getPartitionInfo().getPartitionColumns();
     }
 
     @Override
-    public long getLastModifyTime() {
-        long result = 0L;
-        long visibleVersionTime;
-        for (Partition partition : getAllPartitions()) {
-            visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
-            if (visibleVersionTime > result) {
-                result = visibleVersionTime;
-            }
-        }
-        return result;
+    public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws 
AnalysisException {
+        long visibleVersion = 
getPartitionOrAnalysisException(partitionId).getVisibleVersion();
+        return new MTMVVersionSnapshot(visibleVersion);
     }
 
     @Override
-    public List<Column> getPartitionColumns() {
-        return getPartitionInfo().getPartitionColumns();
+    public MTMVSnapshotIf getTableSnapshot() {
+        long visibleVersion = getVisibleVersion();
+        return new MTMVVersionSnapshot(visibleVersion);
+    }
+
+    @Override
+    public String getPartitionName(long partitionId) throws AnalysisException {
+        return getPartitionOrAnalysisException(partitionId).getName();
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 96efdb8de13..aa4258baadb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -33,7 +33,10 @@ import org.apache.doris.datasource.hive.HMSCachedClient;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVTimestampSnapshot;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
@@ -452,6 +455,15 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return initSchema();
     }
 
+    public long getLastDdlTime() {
+        makeSureInitialized();
+        Map<String, String> parameters = remoteTable.getParameters();
+        if (parameters == null || 
!parameters.containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) {
+            return 0L;
+        }
+        return 
Long.parseLong(parameters.get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000;
+    }
+
     @Override
     public List<Column> initSchema() {
         makeSureInitialized();
@@ -561,7 +573,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     public boolean hasColumnStatistics(String colName) {
         Map<String, String> parameters = remoteTable.getParameters();
         return parameters.keySet().stream()
-            .filter(k -> k.startsWith(SPARK_COL_STATS + colName + 
".")).findAny().isPresent();
+                .filter(k -> k.startsWith(SPARK_COL_STATS + colName + 
".")).findAny().isPresent();
     }
 
     public boolean fillColumnStatistics(String colName, Map<StatsType, String> 
statsTypes, Map<String, String> stats) {
@@ -772,13 +784,13 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     @Override
     public boolean isDistributionColumn(String columnName) {
         return 
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
-            .collect(Collectors.toSet()).contains(columnName.toLowerCase());
+                
.collect(Collectors.toSet()).contains(columnName.toLowerCase());
     }
 
     @Override
     public Set<String> getDistributionColumnNames() {
         return 
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
-            .collect(Collectors.toSet());
+                .collect(Collectors.toSet());
     }
 
     @Override
@@ -805,32 +817,73 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     @Override
-    public long getPartitionLastModifyTime(long partitionId, PartitionItem 
item) throws AnalysisException {
-        List<List<String>> partitionValuesList = 
Lists.newArrayListWithCapacity(1);
-        partitionValuesList.add(
-                ((ListPartitionItem) 
item).getItems().get(0).getPartitionValuesAsStringListForHive());
+    public String getPartitionName(long partitionId) throws AnalysisException {
+        Map<String, Long> partitionNameToIdMap = 
getHivePartitionValues().getPartitionNameToIdMap();
+        for (Entry<String, Long> entry : partitionNameToIdMap.entrySet()) {
+            if (entry.getValue().equals(partitionId)) {
+                return entry.getKey();
+            }
+        }
+        throw new AnalysisException("can not find partition,  partitionId: " + 
partitionId);
+    }
+
+    private HiveMetaStoreCache.HivePartitionValues getHivePartitionValues() {
         HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                 .getMetaStoreCache((HMSExternalCatalog) getCatalog());
-        List<HivePartition> resPartitions = 
cache.getAllPartitionsWithCache(getDbName(), getName(),
-                partitionValuesList);
-        if (resPartitions.size() != 1) {
-            throw new AnalysisException("partition not normal, size: " + 
resPartitions.size());
-        }
-        return resPartitions.get(0).getLastModifiedTimeIgnoreInit();
+        return cache.getPartitionValues(
+                getDbName(), getName(), getPartitionColumnTypes());
     }
 
     @Override
-    public long getLastModifyTime() throws AnalysisException {
+    public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws 
AnalysisException {
+        long partitionLastModifyTime = getPartitionLastModifyTime(partitionId);
+        return new MTMVTimestampSnapshot(partitionLastModifyTime);
+    }
 
-        long result = 0L;
+    @Override
+    public MTMVSnapshotIf getTableSnapshot() throws AnalysisException {
+        if (getPartitionType() == PartitionType.UNPARTITIONED) {
+            return new MTMVMaxTimestampSnapshot(-1L, getLastDdlTime());
+        }
+        long partitionId = 0L;
+        long maxVersionTime = 0L;
         long visibleVersionTime;
         for (Entry<Long, PartitionItem> entry : 
getPartitionItems().entrySet()) {
-            visibleVersionTime = getPartitionLastModifyTime(entry.getKey(), 
entry.getValue());
-            if (visibleVersionTime > result) {
-                result = visibleVersionTime;
+            visibleVersionTime = getPartitionLastModifyTime(entry.getKey());
+            if (visibleVersionTime > maxVersionTime) {
+                maxVersionTime = visibleVersionTime;
+                partitionId = entry.getKey();
             }
         }
-        return result;
+        return new MTMVMaxTimestampSnapshot(partitionId, maxVersionTime);
+    }
+
+    private long getPartitionLastModifyTime(long partitionId) throws 
AnalysisException {
+        return getPartitionById(partitionId).getLastModifiedTime();
+    }
+
+    private HivePartition getPartitionById(long partitionId) throws 
AnalysisException {
+        PartitionItem item = getPartitionItems().get(partitionId);
+        List<List<String>> partitionValuesList = 
transferPartitionItemToPartitionValues(item);
+        List<HivePartition> partitions = 
getPartitionsByPartitionValues(partitionValuesList);
+        if (partitions.size() != 1) {
+            throw new AnalysisException("partition not normal, size: " + 
partitions.size());
+        }
+        return partitions.get(0);
+    }
+
+    private List<HivePartition> 
getPartitionsByPartitionValues(List<List<String>> partitionValuesList) {
+        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                .getMetaStoreCache((HMSExternalCatalog) getCatalog());
+        return cache.getAllPartitionsWithCache(getDbName(), getName(),
+                partitionValuesList);
+    }
+
+    private List<List<String>> 
transferPartitionItemToPartitionValues(PartitionItem item) {
+        List<List<String>> partitionValuesList = 
Lists.newArrayListWithCapacity(1);
+        partitionValuesList.add(
+                ((ListPartitionItem) 
item).getItems().get(0).getPartitionValuesAsStringListForHive());
+        return partitionValuesList;
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 9b5added1b9..194172a6732 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -38,6 +38,7 @@ import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
 import org.apache.doris.mtmv.MTMVPlanUtil;
 import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
+import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
 import org.apache.doris.mtmv.MTMVRelation;
 import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@@ -133,6 +134,7 @@ public class MTMVTask extends AbstractTask {
     private MTMV mtmv;
     private MTMVRelation relation;
     private StmtExecutor executor;
+    private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
 
     public MTMVTask() {
     }
@@ -155,8 +157,9 @@ public class MTMVTask extends AbstractTask {
             // Every time a task is run, the relation is regenerated because 
baseTables and baseViews may change,
             // such as deleting a table and creating a view with the same name
             this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
-            // Before obtaining information from hmsTable, refresh to ensure 
that the data is up-to-date
-            refreshHmsTable();
+            // Now, the MTMV first ensures consistency with the data in the 
cache.
+            // To be completely consistent with hive, you need to manually 
refresh the cache
+            // refreshHmsTable();
             if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
                 MTMVUtil.alignMvPartition(mtmv, 
mtmv.getMvPartitionInfo().getRelatedTable());
             }
@@ -171,6 +174,7 @@ public class MTMVTask extends AbstractTask {
             int refreshPartitionNum = mtmv.getRefreshPartitionNum();
             long execNum = (needRefreshPartitionIds.size() / 
refreshPartitionNum) + ((needRefreshPartitionIds.size()
                     % refreshPartitionNum) > 0 ? 1 : 0);
+            this.partitionSnapshots = Maps.newHashMap();
             for (int i = 0; i < execNum; i++) {
                 int start = i * refreshPartitionNum;
                 int end = start + refreshPartitionNum;
@@ -178,8 +182,11 @@ public class MTMVTask extends AbstractTask {
                         .subList(start, end > needRefreshPartitionIds.size() ? 
needRefreshPartitionIds.size() : end));
                 // need get names before exec
                 List<String> execPartitionNames = 
MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots = MTMVUtil
+                        .generatePartitionSnapshots(mtmv, 
relation.getBaseTables(), execPartitionIds);
                 exec(ctx, execPartitionIds, tableWithPartKey);
                 completedPartitions.addAll(execPartitionNames);
+                partitionSnapshots.putAll(execPartitionSnapshots);
             }
         } catch (Throwable e) {
             LOG.warn("run task failed: ", e);
@@ -241,6 +248,12 @@ public class MTMVTask extends AbstractTask {
         }
     }
 
+    /**
+     * // Before obtaining information from hmsTable, refresh to ensure that 
the data is up-to-date
+     *
+     * @throws AnalysisException
+     * @throws DdlException
+     */
     private void refreshHmsTable() throws AnalysisException, DdlException {
         for (BaseTableInfo tableInfo : relation.getBaseTables()) {
             TableIf tableIf = MTMVUtil.getTable(tableInfo);
@@ -345,11 +358,13 @@ public class MTMVTask extends AbstractTask {
     private void after() {
         if (mtmv != null) {
             Env.getCurrentEnv()
-                    .addMTMVTaskResult(new 
TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
+                    .addMTMVTaskResult(new 
TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation,
+                            partitionSnapshots);
         }
         mtmv = null;
         relation = null;
         executor = null;
+        partitionSnapshots = null;
     }
 
     private Map<TableIf, String> getIncrementalTableMap() throws 
AnalysisException {
@@ -384,7 +399,7 @@ public class MTMVTask extends AbstractTask {
         // check if data is fresh
         // We need to use a newly generated relationship and cannot retrieve 
it using mtmv.getRelation()
         // to avoid rebuilding the baseTable and causing a change in the 
tableId
-        boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), 
mtmv.getExcludedTriggerTables(), 0L);
+        boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), 
mtmv.getExcludedTriggerTables());
         if (fresh) {
             return Lists.newArrayList();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java
new file mode 100644
index 00000000000..5b551cebef6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * The version cannot be obtained from the hive table,
+ * so the update time is used instead of the version
+ */
+public class MTMVMaxTimestampSnapshot implements MTMVSnapshotIf {
+    // partitionId corresponding to timestamp
+    // The reason why both timestamp and partitionId are stored is to avoid
+    // deleting the partition corresponding to timestamp
+    @SerializedName("p")
+    private long partitionId;
+    // The maximum modify time in all partitions
+    @SerializedName("t")
+    private long timestamp;
+
+    public MTMVMaxTimestampSnapshot(long partitionId, long timestamp) {
+        this.partitionId = partitionId;
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MTMVMaxTimestampSnapshot that = (MTMVMaxTimestampSnapshot) o;
+        return partitionId == that.partitionId
+                && timestamp == that.timestamp;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(partitionId, timestamp);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
new file mode 100644
index 00000000000..8de2b4cdfed
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Map;
+
+public class MTMVRefreshPartitionSnapshot {
+    @SerializedName("p")
+    private Map<String, MTMVSnapshotIf> partitions;
+    @SerializedName("t")
+    private Map<Long, MTMVSnapshotIf> tables;
+
+    public MTMVRefreshPartitionSnapshot() {
+        this.partitions = Maps.newConcurrentMap();
+        this.tables = Maps.newConcurrentMap();
+    }
+
+    public Map<String, MTMVSnapshotIf> getPartitions() {
+        return partitions;
+    }
+
+    public Map<Long, MTMVSnapshotIf> getTables() {
+        return tables;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
new file mode 100644
index 00000000000..5132f06a12e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public class MTMVRefreshSnapshot {
+    @SerializedName("ps")
+    private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
+
+    public MTMVRefreshSnapshot() {
+        this.partitionSnapshots = Maps.newConcurrentMap();
+    }
+
+    public boolean equalsWithRelatedPartition(String mtmvPartitionName, String 
relatedPartitionName,
+            MTMVSnapshotIf relatedPartitionCurrentSnapshot) {
+        MTMVRefreshPartitionSnapshot partitionSnapshot = 
partitionSnapshots.get(mtmvPartitionName);
+        if (partitionSnapshot == null) {
+            return false;
+        }
+        MTMVSnapshotIf relatedPartitionSnapshot = 
partitionSnapshot.getPartitions().get(relatedPartitionName);
+        if (relatedPartitionSnapshot == null) {
+            return false;
+        }
+        return 
relatedPartitionSnapshot.equals(relatedPartitionCurrentSnapshot);
+    }
+
+    public boolean equalsWithBaseTable(String mtmvPartitionName, long 
baseTableId,
+            MTMVSnapshotIf baseTableCurrentSnapshot) {
+        MTMVRefreshPartitionSnapshot partitionSnapshot = 
partitionSnapshots.get(mtmvPartitionName);
+        if (partitionSnapshot == null) {
+            return false;
+        }
+        MTMVSnapshotIf relatedPartitionSnapshot = 
partitionSnapshot.getTables().get(baseTableId);
+        if (relatedPartitionSnapshot == null) {
+            return false;
+        }
+        return relatedPartitionSnapshot.equals(baseTableCurrentSnapshot);
+    }
+
+    public void updateSnapshots(Map<String, MTMVRefreshPartitionSnapshot> 
addPartitionSnapshots,
+            Set<String> mvPartitionNames) {
+        if (!MapUtils.isEmpty(addPartitionSnapshots)) {
+            this.partitionSnapshots.putAll(addPartitionSnapshots);
+        }
+        Iterator<String> iterator = partitionSnapshots.keySet().iterator();
+        while (iterator.hasNext()) {
+            String partitionName = iterator.next();
+            if (!mvPartitionNames.contains(partitionName)) {
+                iterator.remove();
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
index d4a9cf3aca7..51773db0df1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
@@ -40,16 +40,6 @@ public interface MTMVRelatedTableIf extends TableIf {
      */
     Map<Long, PartitionItem> getPartitionItems();
 
-    /**
-     * Obtain the latest update time of partition data
-     *
-     * @param partitionId
-     * @param item
-     * @return millisecond
-     * @throws AnalysisException
-     */
-    long getPartitionLastModifyTime(long partitionId, PartitionItem item) 
throws AnalysisException;
-
     /**
      * getPartitionType LIST/RANGE/UNPARTITIONED
      *
@@ -66,17 +56,35 @@ public interface MTMVRelatedTableIf extends TableIf {
     Set<String> getPartitionColumnNames() throws DdlException;
 
     /**
-     * Obtain the latest update time of table data
+     * getPartitionColumns
      *
      * @return
+     */
+    List<Column> getPartitionColumns();
+
+    /**
+     * getPartitionSnapshot
+     *
+     * @param partitionId
+     * @return partition snapshot at current time
      * @throws AnalysisException
      */
-    long getLastModifyTime() throws AnalysisException;
+    MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws 
AnalysisException;
 
     /**
-     * getPartitionColumns
+     * getTableSnapshot
      *
-     * @return
+     * @return table snapshot at current time
+     * @throws AnalysisException
      */
-    List<Column> getPartitionColumns();
+    MTMVSnapshotIf getTableSnapshot() throws AnalysisException;
+
+    /**
+     * getPartitionName
+     *
+     * @param partitionId
+     * @return partitionName
+     * @throws AnalysisException
+     */
+    String getPartitionName(long partitionId) throws AnalysisException;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java
new file mode 100644
index 00000000000..9a15ab7ef90
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+/**
+ * MTMV refresh snapshot
+ */
+public interface MTMVSnapshotIf {
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java
new file mode 100644
index 00000000000..b3fd88a94de
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * The version cannot be obtained from the hive table,
+ * so the update time is used instead of the version
+ */
+public class MTMVTimestampSnapshot implements MTMVSnapshotIf {
+    @SerializedName("t")
+    private long timestamp;
+
+    public MTMVTimestampSnapshot(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MTMVTimestampSnapshot that = (MTMVTimestampSnapshot) o;
+        return timestamp == that.timestamp;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(timestamp);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
index 481670d9448..74593e5def5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
@@ -73,12 +73,11 @@ public class MTMVUtil {
      * @param partitionId
      * @param tables
      * @param excludedTriggerTables
-     * @param gracePeriod
      * @return
      * @throws AnalysisException
      */
     private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, 
Set<BaseTableInfo> tables,
-            Set<String> excludedTriggerTables, Long gracePeriod) throws 
AnalysisException {
+            Set<String> excludedTriggerTables) throws AnalysisException {
         boolean isSyncWithPartition = true;
         if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
             MTMVRelatedTableIf relatedTable = 
mtmv.getMvPartitionInfo().getRelatedTable();
@@ -92,12 +91,9 @@ public class MTMVUtil {
                 LOG.warn("can not found related partition: " + partitionId);
                 return false;
             }
-            isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item, 
relatedTable, relatedPartitionId,
-                    relatedPartitionItems.get(relatedPartitionId));
+            isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, 
relatedTable, relatedPartitionId);
         }
-        return isSyncWithPartition && isFresherThanTables(
-                
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(),
 tables,
-                excludedTriggerTables, gracePeriod);
+        return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, 
partitionId, tables, excludedTriggerTables);
 
     }
 
@@ -158,7 +154,7 @@ public class MTMVUtil {
             return false;
         }
         try {
-            return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), 
Sets.newHashSet(), 0L);
+            return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), 
Sets.newHashSet());
         } catch (AnalysisException e) {
             LOG.warn("isMTMVSync failed: ", e);
             return false;
@@ -171,16 +167,14 @@ public class MTMVUtil {
      * @param mtmv
      * @param tables
      * @param excludeTables
-     * @param gracePeriod
      * @return
      * @throws AnalysisException
      */
-    public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, 
Set<String> excludeTables, long gracePeriod)
+    public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, 
Set<String> excludeTables)
             throws AnalysisException {
         Collection<Partition> partitions = mtmv.getPartitions();
         for (Partition partition : partitions) {
-            if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, 
excludeTables,
-                    gracePeriod)) {
+            if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, 
excludeTables)) {
                 return false;
             }
         }
@@ -197,7 +191,6 @@ public class MTMVUtil {
      */
     public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long 
partitionId) throws AnalysisException {
         List<String> res = Lists.newArrayList();
-        long maxAvailableTime = 
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
         for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) 
{
             TableIf table = getTable(baseTableInfo);
             if (!(table instanceof MTMVRelatedTableIf)) {
@@ -213,14 +206,13 @@ public class MTMVUtil {
                 if (relatedPartitionId == -1L) {
                     throw new AnalysisException("can not found related 
partition");
                 }
-                boolean isSyncWithPartition = isSyncWithPartition(mtmv, 
partitionId, item, mtmvRelatedTableIf,
-                        relatedPartitionId, 
relatedPartitionItems.get(relatedPartitionId));
+                boolean isSyncWithPartition = isSyncWithPartition(mtmv, 
partitionId, mtmvRelatedTableIf,
+                        relatedPartitionId);
                 if (!isSyncWithPartition) {
                     res.add(mtmvRelatedTableIf.getName());
                 }
             } else {
-                long tableLastVisibleVersionTime = 
mtmvRelatedTableIf.getLastModifyTime();
-                if (tableLastVisibleVersionTime > maxAvailableTime) {
+                if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) {
                     res.add(table.getName());
                 }
             }
@@ -257,16 +249,16 @@ public class MTMVUtil {
             return res;
         }
         // check gracePeriod
-        Long gracePeriod = mtmv.getGracePeriod();
-        // do not care data is delayed
-        if (gracePeriod < 0) {
-            return allPartitions;
-        }
-
+        long gracePeriodMills = mtmv.getGracePeriod();
+        long currentTimeMills = System.currentTimeMillis();
         for (Partition partition : allPartitions) {
+            if (gracePeriodMills > 0 && currentTimeMills <= 
(partition.getVisibleVersionTime()
+                    + gracePeriodMills)) {
+                res.add(partition);
+                continue;
+            }
             try {
-                if (isMTMVPartitionSync(mtmv, partition.getId(), 
mtmvRelation.getBaseTables(), Sets.newHashSet(),
-                        gracePeriod)) {
+                if (isMTMVPartitionSync(mtmv, partition.getId(), 
mtmvRelation.getBaseTables(), Sets.newHashSet())) {
                     res.add(partition);
                 }
             } catch (AnalysisException e) {
@@ -290,8 +282,7 @@ public class MTMVUtil {
         for (Partition partition : allPartitions) {
             try {
                 if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
-                        mtmv.getExcludedTriggerTables(),
-                        0L)) {
+                        mtmv.getExcludedTriggerTables())) {
                     res.add(partition.getId());
                 }
             } catch (AnalysisException e) {
@@ -312,11 +303,15 @@ public class MTMVUtil {
      * @return
      * @throws AnalysisException
      */
-    private static boolean isSyncWithPartition(MTMV mtmv, Long 
mtmvPartitionId, PartitionItem mtmvPartitionItem,
+    private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
             MTMVRelatedTableIf relatedTable,
-            Long relatedPartitionId, PartitionItem relatedPartitionItem) 
throws AnalysisException {
-        return mtmv.getPartitionLastModifyTime(mtmvPartitionId, 
mtmvPartitionItem) >= relatedTable
-                .getPartitionLastModifyTime(relatedPartitionId, 
relatedPartitionItem);
+            Long relatedPartitionId) throws AnalysisException {
+        MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
+                .getPartitionSnapshot(relatedPartitionId);
+        String relatedPartitionName = 
relatedTable.getPartitionName(relatedPartitionId);
+        String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
+        return mtmv.getRefreshSnapshot()
+                .equalsWithRelatedPartition(mtmvPartitionName, 
relatedPartitionName, relatedPartitionCurrentSnapshot);
     }
 
     /**
@@ -343,9 +338,17 @@ public class MTMVUtil {
      * @param partitionId
      */
     private static void dropPartition(MTMV mtmv, Long partitionId) throws 
AnalysisException, DdlException {
-        Partition partition = 
mtmv.getPartitionOrAnalysisException(partitionId);
-        DropPartitionClause dropPartitionClause = new 
DropPartitionClause(false, partition.getName(), false, false);
-        Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, 
dropPartitionClause);
+        if (!mtmv.writeLockIfExist()) {
+            return;
+        }
+        try {
+            Partition partition = 
mtmv.getPartitionOrAnalysisException(partitionId);
+            DropPartitionClause dropPartitionClause = new 
DropPartitionClause(false, partition.getName(), false, false);
+            Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), 
mtmv, dropPartitionClause);
+        } finally {
+            mtmv.writeUnlock();
+        }
+
     }
 
     /**
@@ -388,15 +391,13 @@ public class MTMVUtil {
     /**
      * Determine is sync, ignoring excludedTriggerTables and non OlapTanle
      *
-     * @param visibleVersionTime
+     * @param mtmvPartitionId
      * @param tables
      * @param excludedTriggerTables
-     * @param gracePeriod
      * @return
      */
-    private static boolean isFresherThanTables(long visibleVersionTime, 
Set<BaseTableInfo> tables,
-            Set<String> excludedTriggerTables, Long gracePeriod) throws 
AnalysisException {
-        long maxAvailableTime = visibleVersionTime + gracePeriod;
+    private static boolean isSyncWithAllBaseTables(MTMV mtmv, long 
mtmvPartitionId, Set<BaseTableInfo> tables,
+            Set<String> excludedTriggerTables) throws AnalysisException {
         for (BaseTableInfo baseTableInfo : tables) {
             TableIf table = null;
             try {
@@ -408,17 +409,36 @@ public class MTMVUtil {
             if (excludedTriggerTables.contains(table.getName())) {
                 continue;
             }
-            if (!(table instanceof MTMVRelatedTableIf)) {
-                continue;
-            }
-            long tableLastVisibleVersionTime = ((MTMVRelatedTableIf) 
table).getLastModifyTime();
-            if (tableLastVisibleVersionTime > maxAvailableTime) {
+            boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, 
mtmvPartitionId, baseTableInfo);
+            if (!syncWithBaseTable) {
                 return false;
             }
         }
         return true;
     }
 
+    private static boolean isSyncWithBaseTable(MTMV mtmv, long 
mtmvPartitionId, BaseTableInfo baseTableInfo)
+            throws AnalysisException {
+        TableIf table = null;
+        try {
+            table = getTable(baseTableInfo);
+        } catch (AnalysisException e) {
+            LOG.warn("get table failed, {}", baseTableInfo, e);
+            return false;
+        }
+
+        if (!(table instanceof MTMVRelatedTableIf)) {
+            // if not MTMVRelatedTableIf, we can not get snapshot from it,
+            // Currently, it is believed to be synchronous
+            return true;
+        }
+        MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
+        MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
+        String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
+        return mtmv.getRefreshSnapshot()
+                .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), 
baseTableCurrentSnapshot);
+    }
+
     private static boolean mtmvContainsExternalTable(MTMV mtmv) {
         Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
         for (BaseTableInfo baseTableInfo : baseTables) {
@@ -428,4 +448,60 @@ public class MTMVUtil {
         }
         return false;
     }
+
+    public static Map<String, MTMVRefreshPartitionSnapshot> 
generatePartitionSnapshots(MTMV mtmv,
+            Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
+            throws AnalysisException {
+        Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
+        for (Long partitionId : partitionIds) {
+            res.put(mtmv.getPartition(partitionId).getName(), 
generatePartitionSnapshot(mtmv, baseTables, partitionId));
+        }
+        return res;
+    }
+
+
+    private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV 
mtmv,
+            Set<BaseTableInfo> baseTables, Long partitionId)
+            throws AnalysisException {
+        MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new 
MTMVRefreshPartitionSnapshot();
+        if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+            MTMVRelatedTableIf relatedTable = 
mtmv.getMvPartitionInfo().getRelatedTable();
+            List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
+                    mtmv.getPartitionItems().get(partitionId),
+                    relatedTable);
+
+            for (Long relatedPartitionId : relatedPartitionIds) {
+                MTMVSnapshotIf partitionSnapshot = relatedTable
+                        .getPartitionSnapshot(relatedPartitionId);
+                refreshPartitionSnapshot.getPartitions()
+                        
.put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot);
+            }
+        }
+        for (BaseTableInfo baseTableInfo : baseTables) {
+            if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
+                    
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
+                continue;
+            }
+            TableIf table = getTable(baseTableInfo);
+            if (!(table instanceof MTMVRelatedTableIf)) {
+                continue;
+            }
+            refreshPartitionSnapshot.getTables().put(table.getId(), 
((MTMVRelatedTableIf) table).getTableSnapshot());
+        }
+        return refreshPartitionSnapshot;
+    }
+
+    private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem 
mtmvPartitionItem,
+            MTMVRelatedTableIf relatedTable) {
+        List<Long> res = Lists.newArrayList();
+        Map<Long, PartitionItem> relatedPartitionItems = 
relatedTable.getPartitionItems();
+        for (Entry<Long, PartitionItem> entry : 
relatedPartitionItems.entrySet()) {
+            if (mtmvPartitionItem.equals(entry.getValue())) {
+                res.add(entry.getKey());
+                // current, the partitioning of MTMV corresponds one-to-one 
with the partitioning of related table
+                return res;
+            }
+        }
+        return res;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java
new file mode 100644
index 00000000000..14304e60183
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.SerializedName;
+
+public class MTMVVersionSnapshot implements MTMVSnapshotIf {
+    @SerializedName("v")
+    private long version;
+
+    public MTMVVersionSnapshot(long version) {
+        this.version = version;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MTMVVersionSnapshot that = (MTMVVersionSnapshot) o;
+        return version == that.version;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(version);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java
index d2084ff6fa3..6c974c57f18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.job.extensions.mtmv.MTMVTask;
 import org.apache.doris.mtmv.MTMVAlterOpType;
 import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
 import org.apache.doris.mtmv.MTMVRelation;
 import org.apache.doris.mtmv.MTMVStatus;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@@ -52,6 +53,8 @@ public class AlterMTMV implements Writable {
     private MTMVTask task;
     @SerializedName("r")
     private MTMVRelation relation;
+    @SerializedName("ps")
+    private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
 
     public AlterMTMV(TableNameInfo mvName, MTMVRefreshInfo refreshInfo, 
MTMVAlterOpType opType) {
         this.mvName = Objects.requireNonNull(mvName, "require mvName object");
@@ -125,6 +128,15 @@ public class AlterMTMV implements Writable {
         this.relation = relation;
     }
 
+    public Map<String, MTMVRefreshPartitionSnapshot> getPartitionSnapshots() {
+        return partitionSnapshots;
+    }
+
+    public void setPartitionSnapshots(
+            Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
+        this.partitionSnapshots = partitionSnapshots;
+    }
+
     @Override
     public String toString() {
         return "AlterMTMV{"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index fb1b9d8b5ff..cb7d82b22ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -92,6 +92,10 @@ import 
org.apache.doris.load.routineload.AbstractDataSourceProperties;
 import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
 import org.apache.doris.load.sync.SyncJob;
 import org.apache.doris.load.sync.canal.CanalSyncJob;
+import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVTimestampSnapshot;
+import org.apache.doris.mtmv.MTMVVersionSnapshot;
 import org.apache.doris.policy.Policy;
 import org.apache.doris.policy.RowPolicy;
 import org.apache.doris.policy.StoragePolicy;
@@ -241,6 +245,12 @@ public class GsonUtils {
                     .registerSubtype(InsertJob.class, 
InsertJob.class.getSimpleName())
                     .registerSubtype(MTMVJob.class, 
MTMVJob.class.getSimpleName());
 
+    private static RuntimeTypeAdapterFactory<MTMVSnapshotIf> 
mtmvSnapshotTypeAdapterFactory =
+            RuntimeTypeAdapterFactory.of(MTMVSnapshotIf.class, "clazz")
+                    .registerSubtype(MTMVMaxTimestampSnapshot.class, 
MTMVMaxTimestampSnapshot.class.getSimpleName())
+                    .registerSubtype(MTMVTimestampSnapshot.class, 
MTMVTimestampSnapshot.class.getSimpleName())
+                    .registerSubtype(MTMVVersionSnapshot.class, 
MTMVVersionSnapshot.class.getSimpleName());
+
     private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory 
= RuntimeTypeAdapterFactory.of(
                     DatabaseIf.class, "clazz")
             .registerSubtype(ExternalDatabase.class, 
ExternalDatabase.class.getSimpleName())
@@ -299,6 +309,7 @@ public class GsonUtils {
             .registerTypeAdapterFactory(hbResponseTypeAdapterFactory)
             .registerTypeAdapterFactory(rdsTypeAdapterFactory)
             .registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory)
+            .registerTypeAdapterFactory(mtmvSnapshotTypeAdapterFactory)
             .registerTypeAdapterFactory(constraintTypeAdapterFactory)
             .registerTypeAdapter(ImmutableMap.class, new 
ImmutableMapDeserializer())
             .registerTypeAdapter(AtomicBoolean.class, new 
AtomicBooleanAdapter())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to