pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1193435338
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
Review Comment:
Added comments to the sections below
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]