zddr commented on code in PR #54033:
URL: https://github.com/apache/doris/pull/54033#discussion_r2241412500


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java:
##########
@@ -219,7 +223,11 @@ public void addTaskResult(MTMVTask task, MTMVRelation 
relation,
                 this.status.setRefreshState(MTMVRefreshState.FAIL);
             }
             this.jobInfo.addHistoryTask(task);
-            this.refreshSnapshot.updateSnapshots(partitionSnapshots, 
getPartitionNames());
+            Set<String> partitionNames = getPartitionNames();
+            if (incrementalRefresh) {
+                partitionNames.add(this.name);

Review Comment:
   What problem does this solve?
   



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java:
##########
@@ -111,6 +114,7 @@ public MTMV() {
         this.refreshSnapshot = new MTMVRefreshSnapshot();
         this.envInfo = new EnvInfo(-1L, -1L);
         mvRwLock = new ReentrantReadWriteLock(true);
+        this.incrementalRefresh = refreshInfo.getRefreshMethod() == 
MTMVRefreshEnum.RefreshMethod.INCREMENTAL;

Review Comment:
   There's no need to add a separate variable and persist it. You can just get 
it from refreshInfo.getRefreshMethod() when needed.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -230,7 +242,38 @@ public void run() throws JobException {
             MetaLockUtils.readLockTables(tableIfs);
             try {
                 context = MTMVRefreshContext.buildContext(mtmv);
-                this.needRefreshPartitions = 
calculateNeedRefreshPartitions(context);
+                if (mtmv.getIncrementalRefresh()) {
+                    Set<BaseTableInfo> baseTables = 
relation.getBaseTablesOneLevel();
+                    if (baseTables.size() != 1) {
+                        throw new RuntimeException("Only support incremental 
refresh for single table MTMV");
+                    }
+                    BaseTableInfo baseTableInfo = baseTables.iterator().next();
+                    MTMVSnapshotIf mvSnapshot = 
mtmv.getRefreshSnapshot().getMVSnapshot(mtmv.getName(), baseTableInfo);
+                    mvSnapshotId = 
Optional.ofNullable(mvSnapshot).map(MTMVSnapshotIf::getSnapshotVersion).orElse(0L);
+                    params.put(TableScanParams.READ_MODE, 
TableScanParams.INCREMENTAL_READ);
+                    if (taskContext.getTriggerMode() == 
MTMVTaskTriggerMode.MANUAL
+                            && (taskContext.isComplete() || 
!CollectionUtils.isEmpty(taskContext.getPartitions()))) {
+                        params.put(TableScanParams.DORIS_START_SNAPSHOT_ID, 
String.valueOf(mvSnapshotId));

Review Comment:
   Try to decouple the logic of the previous partition refresh and the current 
incremental refresh as much as possible (you might consider splitting them into 
different classes instead of using a lot of if and else statements everywhere), 
and then add some comments.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -230,7 +242,38 @@ public void run() throws JobException {
             MetaLockUtils.readLockTables(tableIfs);
             try {
                 context = MTMVRefreshContext.buildContext(mtmv);
-                this.needRefreshPartitions = 
calculateNeedRefreshPartitions(context);
+                if (mtmv.getIncrementalRefresh()) {
+                    Set<BaseTableInfo> baseTables = 
relation.getBaseTablesOneLevel();
+                    if (baseTables.size() != 1) {
+                        throw new RuntimeException("Only support incremental 
refresh for single table MTMV");

Review Comment:
   JobException?



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -230,7 +242,38 @@ public void run() throws JobException {
             MetaLockUtils.readLockTables(tableIfs);
             try {
                 context = MTMVRefreshContext.buildContext(mtmv);
-                this.needRefreshPartitions = 
calculateNeedRefreshPartitions(context);
+                if (mtmv.getIncrementalRefresh()) {
+                    Set<BaseTableInfo> baseTables = 
relation.getBaseTablesOneLevel();
+                    if (baseTables.size() != 1) {
+                        throw new RuntimeException("Only support incremental 
refresh for single table MTMV");
+                    }
+                    BaseTableInfo baseTableInfo = baseTables.iterator().next();
+                    MTMVSnapshotIf mvSnapshot = 
mtmv.getRefreshSnapshot().getMVSnapshot(mtmv.getName(), baseTableInfo);

Review Comment:
   Describe in the PR how the snapshot information is persisted in the 
incremental refresh scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to