pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1194833279


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the 
planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the 
expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = 
enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), 
pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, 
pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningFailures() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 3);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and 
continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the 
failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> 
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {

Review Comment:
   This is testing that we are ignoring failures (`-1`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to