pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1194833279
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testTransientPlanningFailure() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+
+ // Trigger a planning and check that we did not get splits due to the
planning error
+ enumeratorContext.triggerAllActions();
+ Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+ // Trigger the planning again to recover from the failure, and we get the
expected splits
+ enumeratorContext.triggerAllActions();
+ Collection<IcebergSourceSplitState> pendingSplits =
enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(),
pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED,
pendingSplit.status());
+ }
+
+ @Test
+ public void testOverMaxAllowedPlanningFailures() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 3);
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+
+ // Check that the scheduler response ignores the current error and
continues to run until the
+ // failure limit is reached
+ enumeratorContext.triggerAllActions();
+ Assert.assertFalse(
+
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+ // Check that the task has failed with the expected exception after the
failure limit is reached
+ enumeratorContext.triggerAllActions();
+ Assert.assertTrue(
+
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+ Assertions.assertThatThrownBy(
+ () ->
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+ .hasCauseInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Failed to discover new split");
+ }
+
+ @Test
+ public void testOriginalRetry() throws Exception {
Review Comment:
This is testing that we are ignoring failures (`-1`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]