zddr commented on code in PR #54033:
URL: https://github.com/apache/doris/pull/54033#discussion_r2241412500
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java:
##########
@@ -219,7 +223,11 @@ public void addTaskResult(MTMVTask task, MTMVRelation
relation,
this.status.setRefreshState(MTMVRefreshState.FAIL);
}
this.jobInfo.addHistoryTask(task);
- this.refreshSnapshot.updateSnapshots(partitionSnapshots,
getPartitionNames());
+ Set<String> partitionNames = getPartitionNames();
+ if (incrementalRefresh) {
+ partitionNames.add(this.name);
Review Comment:
What problem does this solve?
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java:
##########
@@ -111,6 +114,7 @@ public MTMV() {
this.refreshSnapshot = new MTMVRefreshSnapshot();
this.envInfo = new EnvInfo(-1L, -1L);
mvRwLock = new ReentrantReadWriteLock(true);
+ this.incrementalRefresh = refreshInfo.getRefreshMethod() ==
MTMVRefreshEnum.RefreshMethod.INCREMENTAL;
Review Comment:
There's no need to add a separate variable and persist it. You can just get
it from refreshInfo.getRefreshMethod() when needed.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -230,7 +242,38 @@ public void run() throws JobException {
MetaLockUtils.readLockTables(tableIfs);
try {
context = MTMVRefreshContext.buildContext(mtmv);
- this.needRefreshPartitions =
calculateNeedRefreshPartitions(context);
+ if (mtmv.getIncrementalRefresh()) {
+ Set<BaseTableInfo> baseTables =
relation.getBaseTablesOneLevel();
+ if (baseTables.size() != 1) {
+ throw new RuntimeException("Only support incremental
refresh for single table MTMV");
+ }
+ BaseTableInfo baseTableInfo = baseTables.iterator().next();
+ MTMVSnapshotIf mvSnapshot =
mtmv.getRefreshSnapshot().getMVSnapshot(mtmv.getName(), baseTableInfo);
+ mvSnapshotId =
Optional.ofNullable(mvSnapshot).map(MTMVSnapshotIf::getSnapshotVersion).orElse(0L);
+ params.put(TableScanParams.READ_MODE,
TableScanParams.INCREMENTAL_READ);
+ if (taskContext.getTriggerMode() ==
MTMVTaskTriggerMode.MANUAL
+ && (taskContext.isComplete() ||
!CollectionUtils.isEmpty(taskContext.getPartitions()))) {
+ params.put(TableScanParams.DORIS_START_SNAPSHOT_ID,
String.valueOf(mvSnapshotId));
Review Comment:
Try to decouple the logic of the previous partition refresh and the current
incremental refresh as much as possible (you might consider splitting them into
different classes instead of using a lot of if and else statements everywhere),
and then add some comments.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -230,7 +242,38 @@ public void run() throws JobException {
MetaLockUtils.readLockTables(tableIfs);
try {
context = MTMVRefreshContext.buildContext(mtmv);
- this.needRefreshPartitions =
calculateNeedRefreshPartitions(context);
+ if (mtmv.getIncrementalRefresh()) {
+ Set<BaseTableInfo> baseTables =
relation.getBaseTablesOneLevel();
+ if (baseTables.size() != 1) {
+ throw new RuntimeException("Only support incremental
refresh for single table MTMV");
Review Comment:
JobException?
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -230,7 +242,38 @@ public void run() throws JobException {
MetaLockUtils.readLockTables(tableIfs);
try {
context = MTMVRefreshContext.buildContext(mtmv);
- this.needRefreshPartitions =
calculateNeedRefreshPartitions(context);
+ if (mtmv.getIncrementalRefresh()) {
+ Set<BaseTableInfo> baseTables =
relation.getBaseTablesOneLevel();
+ if (baseTables.size() != 1) {
+ throw new RuntimeException("Only support incremental
refresh for single table MTMV");
+ }
+ BaseTableInfo baseTableInfo = baseTables.iterator().next();
+ MTMVSnapshotIf mvSnapshot =
mtmv.getRefreshSnapshot().getMVSnapshot(mtmv.getName(), baseTableInfo);
Review Comment:
Describe in the PR how the snapshot information is persisted in the
incremental refresh scenario.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]