chenjunjiedada commented on code in PR #4943:
URL: https://github.com/apache/iceberg/pull/4943#discussion_r888543723


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java:
##########
@@ -155,6 +162,12 @@ private void monitorAndForwardSplits() {
       if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
         newScanContext = scanContext.copyWithSnapshotId(snapshotId);
       } else {
+        List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(table, 
lastSnapshotId, snapshot.snapshotId());
+        if (snapshotIds.size() < scanContext.monitorSnapshotNumber()) {
+          snapshotId = snapshot.snapshotId();
+        } else {
+          snapshotId = snapshotIds.get(snapshotIds.size() - 
scanContext.monitorSnapshotNumber());
+        }

Review Comment:
   The SnapshotIdBetween function returns a list of snapshot IDs 
`[lastSnapshotId(exclusive), currentSnapshotId(inclusive)]` that are ordered by 
commit time, descending. So the latest snapshot is the first item on the list.
   
   Consider following two cases:
   1. `monitorSnapshotNumber` > `snapshotIds.size()`,  `snapshotId` should be 
the Id of latest snapshot.
   2. `monitorSnapshotNumber` < ` snapshotIds.size()`, `snapshotId` is computed 
according to reverted index because of the descending order in the list.
   
   When `monitorSnapshotNumber` is equal to `snapshotIds.size()`, `snapshotId` 
values are same in `if` and `else` blocks.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to