stevenzwu commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1194199897


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the 
planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the 
expected splits

Review Comment:
   nit: avoid `we` in the comment. maybe like `Second scan planning should 
succeed and discover the expected splits`?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,13 @@ private void 
processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() < 0
+          || consecutiveFailures < scanContext.maxAllowedPlanningFailures()) {

Review Comment:
   nit: should be `<=`. if maxAllowedPlanningFailures is 3, we should allow 3 
consecutive failures.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the 
planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the 
expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = 
enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), 
pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, 
pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningFailures() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 3);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and 
continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the 
failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> 
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {

Review Comment:
   nit: this method name is not informative. what does `original` refer to? 
also how is this test different with `testTransientPlanningFailure`?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the 
planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the 
expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = 
enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), 
pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, 
pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningFailures() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 3);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and 
continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the 
failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> 
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Can not discover the new split with planning failures
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(2).pendingSplits();

Review Comment:
   nit: change the checkpoint id to `i`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to