kbendick commented on code in PR #4943:
URL: https://github.com/apache/iceberg/pull/4943#discussion_r888501221
##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java:
##########
@@ -72,6 +72,9 @@ class ScanContext implements Serializable {
private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
+ private static final ConfigOption<Integer> MONITOR_SNAPSHOT_NUMBER =
+
ConfigOptions.key("monitor-snapshot-number").intType().defaultValue(Integer.MAX_VALUE);
Review Comment:
Nit: Is there perhaps a more descriptive name for this? This is the number
of snapshots to consider within each monitor interval loop, correct?
Kafka has more or less the same concept in its `max.poll.interval` and other
consumer related configurations properties around polling.
Maybe we can take some inspiration from that naming. Thinking off the top of
my head, but maybe `monitor-max-snapshots-per-interval` or something like that
would be more instructive to the user? Given we already have `monitor-interval`
as a configuration property as well.
cc @stevenzwu for your thoughts as well
##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -208,6 +208,37 @@ public void testCheckpointRestore() throws Exception {
}
}
+ @Test
+ public void testConsumeWithDifferentMonitorSnapshotNumbers() throws
Exception {
+ List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+
+ for (int monitorNumber = 1; monitorNumber < 11; monitorNumber =
monitorNumber + 1) {
+ ScanContext scanContext = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .monitorSnapshotNumber(monitorNumber)
+ .build();
Review Comment:
Are there any assertions we can apply (that wouldn't be too flakey) for this
whole outer loop? Seems like we should have 10 splits total, correct?
Also nit on starting the for loop at 0 vs 1 if possible.
##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java:
##########
@@ -155,6 +162,12 @@ private void monitorAndForwardSplits() {
if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
newScanContext = scanContext.copyWithSnapshotId(snapshotId);
} else {
+ List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(table,
lastSnapshotId, snapshot.snapshotId());
+ if (snapshotIds.size() < scanContext.monitorSnapshotNumber()) {
+ snapshotId = snapshot.snapshotId();
+ } else {
+ snapshotId = snapshotIds.get(snapshotIds.size() -
scanContext.monitorSnapshotNumber());
+ }
Review Comment:
Can you elaborate on this logic here / walk me through an example case where
`snapshotId` needs to be determined because it's equal to (or possibly greater
than?) the `monitorSnapshotNumber`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]