kbendick commented on code in PR #4943:
URL: https://github.com/apache/iceberg/pull/4943#discussion_r888501221


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java:
##########
@@ -72,6 +72,9 @@ class ScanContext implements Serializable {
   private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
       
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
 
+  private static final ConfigOption<Integer> MONITOR_SNAPSHOT_NUMBER =
+      
ConfigOptions.key("monitor-snapshot-number").intType().defaultValue(Integer.MAX_VALUE);

Review Comment:
   Nit: Is there perhaps a more descriptive name for this? This is the number 
of snapshots to consider within each monitor interval loop, correct?
   
   Kafka has more or less the same concept in its `max.poll.interval` and other 
consumer related configurations properties around polling.
   
   Maybe we can take some inspiration from that naming. Thinking off the top of 
my head, but maybe `monitor-max-snapshots-per-interval` or something like that 
would be more instructive to the user? Given we already have `monitor-interval` 
as a configuration property as well.
   
   cc @stevenzwu for your thoughts as well



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -208,6 +208,37 @@ public void testCheckpointRestore() throws Exception {
     }
   }
 
+  @Test
+  public void testConsumeWithDifferentMonitorSnapshotNumbers() throws 
Exception {
+    List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+
+    for (int monitorNumber = 1; monitorNumber < 11; monitorNumber = 
monitorNumber + 1) {
+      ScanContext scanContext = ScanContext.builder()
+          .monitorInterval(Duration.ofMillis(100))
+          .monitorSnapshotNumber(monitorNumber)
+          .build();

Review Comment:
   Are there any assertions we can apply (that wouldn't be too flakey) for this 
whole outer loop? Seems like we should have 10 splits total, correct?
   
   Also nit on starting the for loop at 0 vs 1 if possible.



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java:
##########
@@ -155,6 +162,12 @@ private void monitorAndForwardSplits() {
       if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
         newScanContext = scanContext.copyWithSnapshotId(snapshotId);
       } else {
+        List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(table, 
lastSnapshotId, snapshot.snapshotId());
+        if (snapshotIds.size() < scanContext.monitorSnapshotNumber()) {
+          snapshotId = snapshot.snapshotId();
+        } else {
+          snapshotId = snapshotIds.get(snapshotIds.size() - 
scanContext.monitorSnapshotNumber());
+        }

Review Comment:
   Can you elaborate on this logic here / walk me through an example case where 
`snapshotId` needs to be determined because it's equal to (or possibly greater 
than?) the `monitorSnapshotNumber`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to