pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1193436087


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = 
enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), 
pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, 
pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 3);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that the task has failed with the expected exception
+    Assertions.assertThatThrownBy(
+            () -> 
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Until we have errors, we do not have result

Review Comment:
   Done



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = 
enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), 
pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, 
pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 3);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that the task has failed with the expected exception
+    Assertions.assertThatThrownBy(
+            () -> 
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Until we have errors, we do not have result
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(2).pendingSplits();
+      Assert.assertEquals(0, pendingSplits.size());
+    }
+
+    // After the errors are fixed we can continue

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to