stevenzwu commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1192754161
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
Review Comment:
nit: method name should be adjusted? like `testTransientPlanningFailure`.
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
+
+ // Check that we got the expected splits
+ Collection<IcebergSourceSplitState> pendingSplits =
enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(),
pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED,
pendingSplit.status());
+ }
+
+ @Test
+ public void testRetryLimitReached() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 3);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
Review Comment:
nit: add comment line to explain the expected behavior.
maybe also add an assertion with `getAllScheduledTasks` similar to the block
below?
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,14 @@ private void
processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
LOG.info("Update enumerator position to {}", result.toPosition());
}
} else {
- LOG.error("Failed to discover new splits", error);
+ consecutiveFailures++;
+ if (scanContext.maxAllowedPlanningFailures() == -1
+ || consecutiveFailures < scanContext.maxAllowedPlanningFailures()) {
+ // To have an option for the original behavior - unlimited retries
without job failure
Review Comment:
nit: this comment probably needs to be adjusted
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,14 @@ private void
processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
LOG.info("Update enumerator position to {}", result.toPosition());
}
} else {
- LOG.error("Failed to discover new splits", error);
+ consecutiveFailures++;
+ if (scanContext.maxAllowedPlanningFailures() == -1
Review Comment:
nit: I know we have Preconditions check of `>= -1`. I am still wondering if
the check here could be just `< 0`?
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
Review Comment:
move it after the comment line at 252 to separate the two blocks.
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
+
+ // Check that we got the expected splits
+ Collection<IcebergSourceSplitState> pendingSplits =
enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(),
pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED,
pendingSplit.status());
+ }
+
+ @Test
+ public void testRetryLimitReached() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 3);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
Review Comment:
nit: move this line after the comment line 282 that explains the expected
behavior.
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
Review Comment:
we should add assertion here that no splits are discovered due to the
expected planning failure. add some comment to explain the expected behavior too
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
+
+ // Check that we got the expected splits
+ Collection<IcebergSourceSplitState> pendingSplits =
enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(),
pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED,
pendingSplit.status());
+ }
+
+ @Test
+ public void testRetryLimitReached() throws Exception {
Review Comment:
nit: method name needs adjustment. like `testOverMaxAllowedPlanningFailures`?
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
+
+ // Check that we got the expected splits
+ Collection<IcebergSourceSplitState> pendingSplits =
enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(),
pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED,
pendingSplit.status());
+ }
+
+ @Test
+ public void testRetryLimitReached() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 3);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
+
+ // Check that the task has failed with the expected exception
+ Assertions.assertThatThrownBy(
+ () ->
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+ .hasCauseInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Failed to discover new split");
+ }
+
+ @Test
+ public void testOriginalRetry() throws Exception {
+ int expectedFailures = 3;
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(-1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner =
+ new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+
+ Collection<IcebergSourceSplitState> pendingSplits;
+ // Until we have errors, we do not have result
+ for (int i = 0; i < expectedFailures; ++i) {
+ enumeratorContext.triggerAllActions();
+ pendingSplits = enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(0, pendingSplits.size());
+ }
+
+ // After the errors are fixed we can continue
Review Comment:
nit: `Discovered the new split after a successful scan planning`
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testRetryAndFinish() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
+
+ // Check that we got the expected splits
+ Collection<IcebergSourceSplitState> pendingSplits =
enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(),
pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED,
pendingSplit.status());
+ }
+
+ @Test
+ public void testRetryLimitReached() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 3);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+ enumeratorContext.triggerAllActions();
+ enumeratorContext.triggerAllActions();
+
+ // Check that the task has failed with the expected exception
+ Assertions.assertThatThrownBy(
+ () ->
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+ .hasCauseInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Failed to discover new split");
+ }
+
+ @Test
+ public void testOriginalRetry() throws Exception {
+ int expectedFailures = 3;
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(-1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner =
+ new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+
+ Collection<IcebergSourceSplitState> pendingSplits;
+ // Until we have errors, we do not have result
Review Comment:
nit: Iceberg style doesn't use words like `we` in the comment. Maybe `Can
not discover the new split with planning failures`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]