This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7cbde142f7 Flink: Add config for max allowed consecutive planning
failures in IcebergSource before failing the job (#7571)
7cbde142f7 is described below
commit 7cbde142f71236ed71a2cb036db46e7e7672e3de
Author: pvary <[email protected]>
AuthorDate: Wed May 17 04:52:37 2023 +0200
Flink: Add config for max allowed consecutive planning failures in
IcebergSource before failing the job (#7571)
---
docs/flink-configuration.md | 45 ++++----
.../org/apache/iceberg/flink/FlinkReadConf.java | 9 ++
.../org/apache/iceberg/flink/FlinkReadOptions.java | 4 +
.../apache/iceberg/flink/source/IcebergSource.java | 7 ++
.../apache/iceberg/flink/source/ScanContext.java | 24 ++++-
.../enumerator/ContinuousIcebergEnumerator.java | 12 ++-
.../enumerator/ManualContinuousSplitPlanner.java | 9 +-
.../TestContinuousIcebergEnumerator.java | 115 ++++++++++++++++++++-
8 files changed, 196 insertions(+), 29 deletions(-)
diff --git a/docs/flink-configuration.md b/docs/flink-configuration.md
index 8e3494ab93..37db16c363 100644
--- a/docs/flink-configuration.md
+++ b/docs/flink-configuration.md
@@ -111,27 +111,28 @@ env.getConfig()
`Read option` has the highest priority, followed by `Flink configuration` and
then `Table property`.
-| Read option | Flink configuration
| Table property | Default | Description
|
-| --------------------------- | ---------------------------------------------
| ---------------------------- | -------------------------------- |
------------------------------------------------------------ |
-| snapshot-id | N/A
| N/A | null | For time
travel in batch mode. Read data from the specified snapshot-id. |
-| case-sensitive | connector.iceberg.case-sensitive
| N/A | false | If true,
match column name in a case sensitive way. |
-| as-of-timestamp | N/A
| N/A | null | For time
travel in batch mode. Read data from the most recent snapshot as of the given
time in milliseconds. |
-| starting-strategy | connector.iceberg.starting-strategy
| N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting
strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular
table scan then switch to the incremental mode. The incremental mode starts
from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start
incremental mode from the latest snapshot inclusive. If it is an empty map, all
future append snapshots shou [...]
-| start-snapshot-timestamp | N/A
| N/A | null | Start to
read data from the most recent snapshot as of the given time in milliseconds. |
-| start-snapshot-id | N/A
| N/A | null | Start to
read data from the specified snapshot-id. |
-| end-snapshot-id | N/A
| N/A | The latest snapshot id | Specifies
the end snapshot.
-| branch | N/A
| N/A | main | Specifies the branch to read from in batch mode
-| tag | N/A
| N/A | null | Specifies the tag to read from in batch mode
-| start-tag | N/A
| N/A | null | Specifies the starting tag to read from for
incremental reads
-| end-tag | N/A
| N/A | null | Specifies the ending tag to to read from for
incremental reads |
-| split-size | connector.iceberg.split-size
| read.split.target-size | 128 MB | Target size
when combining input splits. |
-| split-lookback | connector.iceberg.split-file-open-cost
| read.split.planning-lookback | 10 | Number of
bins to consider when combining input splits. |
-| split-file-open-cost | connector.iceberg.split-file-open-cost
| read.split.open-file-cost | 4MB | The
estimated cost to open a file, used as a minimum weight when combining splits. |
-| streaming | connector.iceberg.streaming
| N/A | false | Sets
whether the current task runs in streaming or batch mode. |
-| monitor-interval | connector.iceberg.monitor-interval
| N/A | 60s | Monitor
interval to discover splits from new snapshots. Applicable only for streaming
read. |
-| include-column-stats | connector.iceberg.include-column-stats
| N/A | false | Create a
new scan from this that loads the column stats with each data file. Column
stats include: value count, null value count, lower bounds, and upper bounds. |
-| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count
| N/A | Integer.MAX_VALUE | Max number
of snapshots limited per split enumeration. Applicable only to streaming read. |
-| limit | connector.iceberg.limit
| N/A | -1 | Limited
output number of rows. |
+| Read option | Flink configuration
| Table property | Default |
Description
[...]
+|-------------------------------|-------------------------------------------------|------------------------------|----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| snapshot-id | N/A
| N/A | null | For time
travel in batch mode. Read data from the specified snapshot-id.
[...]
+| case-sensitive | connector.iceberg.case-sensitive
| N/A | false | If true,
match column name in a case sensitive way.
[...]
+| as-of-timestamp | N/A
| N/A | null | For time
travel in batch mode. Read data from the most recent snapshot as of the given
time in milliseconds.
[...]
+| starting-strategy | connector.iceberg.starting-strategy
| N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting
strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular
table scan then switch to the incremental mode. The incremental mode starts
from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start
incremental mode from the latest snapshot inclusive. If it is an empty map, all
future append snapshots [...]
+| start-snapshot-timestamp | N/A
| N/A | null | Start to
read data from the most recent snapshot as of the given time in milliseconds.
[...]
+| start-snapshot-id | N/A
| N/A | null | Start to
read data from the specified snapshot-id.
[...]
+| end-snapshot-id | N/A
| N/A | The latest snapshot id |
Specifies the end snapshot.
[...]
+| branch | N/A
| N/A | main |
Specifies the branch to read from in batch mode
[...]
+| tag | N/A
| N/A | null |
Specifies the tag to read from in batch mode
[...]
+| start-tag | N/A
| N/A | null |
Specifies the starting tag to read from for incremental reads
[...]
+| end-tag | N/A
| N/A | null |
Specifies the ending tag to to read from for incremental reads
[...]
+| split-size | connector.iceberg.split-size
| read.split.target-size | 128 MB | Target
size when combining input splits.
[...]
+| split-lookback | connector.iceberg.split-file-open-cost
| read.split.planning-lookback | 10 | Number
of bins to consider when combining input splits.
[...]
+| split-file-open-cost | connector.iceberg.split-file-open-cost
| read.split.open-file-cost | 4MB | The
estimated cost to open a file, used as a minimum weight when combining splits.
[...]
+| streaming | connector.iceberg.streaming
| N/A | false | Sets
whether the current task runs in streaming or batch mode.
[...]
+| monitor-interval | connector.iceberg.monitor-interval
| N/A | 60s | Monitor
interval to discover splits from new snapshots. Applicable only for streaming
read.
[...]
+| include-column-stats | connector.iceberg.include-column-stats
| N/A | false | Create a
new scan from this that loads the column stats with each data file. Column
stats include: value count, null value count, lower bounds, and upper bounds.
[...]
+| max-planning-snapshot-count |
connector.iceberg.max-planning-snapshot-count | N/A
| Integer.MAX_VALUE | Max number of snapshots limited per split
enumeration. Applicable only to streaming read.
[...]
+| limit | connector.iceberg.limit
| N/A | -1 | Limited
output number of rows.
[...]
+| max-allowed-planning-failures |
connector.iceberg.max-allowed-planning-failures | N/A
| 3 | Max allowed consecutive failures for scan
planning before failing the job. Set to -1 for never failing the job for scan
planing failure.
[...]
### Write options
@@ -163,4 +164,4 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true')
*/
| compression-codec | Table write.(fileformat).compression-codec |
Overrides this table's compression codec for this write |
| compression-level | Table write.(fileformat).compression-level |
Overrides this table's compression level for Parquet and Avro tables for this
write |
| compression-strategy | Table write.orc.compression-strategy |
Overrides this table's compression strategy for ORC tables for this write |
-| write-parallelism | Upstream operator parallelism |
Overrides the writer parallelism |
\ No newline at end of file
+| write-parallelism | Upstream operator parallelism |
Overrides the writer parallelism |
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index baef57a8e7..0e04c9affb 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -181,4 +181,13 @@ public class FlinkReadConf {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}
+
+ public int maxAllowedPlanningFailures() {
+ return confParser
+ .intConf()
+ .option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
+ .flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
+
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
+ .parse();
+ }
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index d75b2234d7..55c5aca3b6 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -105,4 +105,8 @@ public class FlinkReadOptions {
public static final String LIMIT = "limit";
public static final ConfigOption<Long> LIMIT_OPTION =
ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+ public static final String MAX_ALLOWED_PLANNING_FAILURES =
"max-allowed-planning-failures";
+ public static final ConfigOption<Integer>
MAX_ALLOWED_PLANNING_FAILURES_OPTION =
+ ConfigOptions.key(PREFIX +
MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0675305e10..cbdd184870 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -391,6 +391,13 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return this;
}
+ public Builder<T> maxAllowedPlanningFailures(int
maxAllowedPlanningFailures) {
+ readOptions.put(
+ FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
+ Integer.toString(maxAllowedPlanningFailures));
+ return this;
+ }
+
/**
* Set the read properties for Flink source. View the supported properties
in {@link
* FlinkReadOptions}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 23f33e6d2e..e380204e87 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -64,6 +64,7 @@ public class ScanContext implements Serializable {
private final boolean includeColumnStats;
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
+ private final int maxAllowedPlanningFailures;
private ScanContext(
boolean caseSensitive,
@@ -86,6 +87,7 @@ public class ScanContext implements Serializable {
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount,
+ int maxAllowedPlanningFailures,
String branch,
String tag,
String startTag,
@@ -115,6 +117,7 @@ public class ScanContext implements Serializable {
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
+ this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
validate();
}
@@ -155,6 +158,10 @@ public class ScanContext implements Serializable {
Preconditions.checkArgument(
!(endTag != null && endSnapshotId() != null),
"END_SNAPSHOT_ID and END_TAG cannot both be set.");
+
+ Preconditions.checkArgument(
+ maxAllowedPlanningFailures >= -1,
+ "Cannot set maxAllowedPlanningFailures to a negative number other than
-1.");
}
public boolean caseSensitive() {
@@ -253,6 +260,10 @@ public class ScanContext implements Serializable {
return maxPlanningSnapshotCount;
}
+ public int maxAllowedPlanningFailures() {
+ return maxAllowedPlanningFailures;
+ }
+
public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long
newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
@@ -277,6 +288,7 @@ public class ScanContext implements Serializable {
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}
@@ -304,6 +316,7 @@ public class ScanContext implements Serializable {
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}
@@ -341,6 +354,8 @@ public class ScanContext implements Serializable {
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
private int maxPlanningSnapshotCount =
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
+ private int maxAllowedPlanningFailures =
+ FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
private Builder() {}
@@ -464,6 +479,11 @@ public class ScanContext implements Serializable {
return this;
}
+ public Builder maxAllowedPlanningFailures(int
newMaxAllowedPlanningFailures) {
+ this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
+ return this;
+ }
+
public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig
readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions,
readableConfig);
@@ -488,7 +508,8 @@ public class ScanContext implements Serializable {
.limit(flinkReadConf.limit())
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
- .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
+ .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
}
public ScanContext build() {
@@ -513,6 +534,7 @@ public class ScanContext implements Serializable {
exposeLocality,
planParallelism,
maxPlanningSnapshotCount,
+ maxAllowedPlanningFailures,
branch,
tag,
startTag,
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index b84dab190a..b1dadfb9a6 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -55,6 +55,9 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
/** Track enumeration result history for split discovery throttling. */
private final EnumerationHistory enumerationHistory;
+ /** Count the consecutive failures and throw exception if the max allowed
failres are reached */
+ private transient int consecutiveFailures = 0;
+
public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
@@ -122,6 +125,7 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
/** This method is executed in a single coordinator thread. */
private void processDiscoveredSplits(ContinuousEnumerationResult result,
Throwable error) {
if (error == null) {
+ consecutiveFailures = 0;
if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
// Multiple discoverSplits() may be triggered with the same starting
snapshot to the I/O
// thread pool. E.g., the splitDiscoveryInterval is very short (like
10 ms in some unit
@@ -161,7 +165,13 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
LOG.info("Update enumerator position to {}", result.toPosition());
}
} else {
- LOG.error("Failed to discover new splits", error);
+ consecutiveFailures++;
+ if (scanContext.maxAllowedPlanningFailures() < 0
+ || consecutiveFailures <= scanContext.maxAllowedPlanningFailures()) {
+ LOG.error("Failed to discover new splits", error);
+ } else {
+ throw new RuntimeException("Failed to discover new splits", error);
+ }
}
}
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
index 7575beed6e..ebc92df023 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -32,16 +32,23 @@ class ManualContinuousSplitPlanner implements
ContinuousSplitPlanner {
// track splits per snapshot
private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
private long latestSnapshotId;
+ private int remainingFailures;
- ManualContinuousSplitPlanner(ScanContext scanContext) {
+ ManualContinuousSplitPlanner(ScanContext scanContext, int expectedFailures) {
this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
this.splits = new TreeMap<>();
this.latestSnapshotId = 0L;
+ this.remainingFailures = expectedFailures;
}
@Override
public synchronized ContinuousEnumerationResult planSplits(
IcebergEnumeratorPosition lastPosition) {
+ if (remainingFailures > 0) {
+ remainingFailures--;
+ throw new RuntimeException("Expected failure at planning");
+ }
+
long fromSnapshotIdExclusive = 0;
if (lastPosition != null && lastPosition.snapshotId() != null) {
fromSnapshotIdExclusive = lastPosition.snapshotId();
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index a051a4de0f..d0ae8fdf77 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -51,7 +51,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -81,7 +81,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -110,7 +110,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -163,7 +163,7 @@ public class TestContinuousIcebergEnumerator {
// discover one snapshot at a time
.maxPlanningSnapshotCount(1)
.build();
- ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -227,6 +227,113 @@ public class TestContinuousIcebergEnumerator {
splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testTransientPlanningErrorsWithSuccessfulRetry() throws
Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+
+ // Trigger a planning and check that no splits returned due to the
planning error
+ enumeratorContext.triggerAllActions();
+ Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+ // Second scan planning should succeed and discover the expected splits
+ enumeratorContext.triggerAllActions();
+ Collection<IcebergSourceSplitState> pendingSplits =
enumerator.snapshotState(3).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(),
pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED,
pendingSplit.status());
+ }
+
+ @Test
+ public void testOverMaxAllowedPlanningErrors() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext, 2);
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+
+ // Check that the scheduler response ignores the current error and
continues to run until the
+ // failure limit is reached
+ enumeratorContext.triggerAllActions();
+ Assert.assertFalse(
+
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+ // Check that the task has failed with the expected exception after the
failure limit is reached
+ enumeratorContext.triggerAllActions();
+ Assert.assertTrue(
+
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+ Assertions.assertThatThrownBy(
+ () ->
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+ .hasCauseInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Failed to discover new split");
+ }
+
+ @Test
+ public void testPlanningIgnoringErrors() throws Exception {
+ int expectedFailures = 3;
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(-1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner =
+ new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ splitPlanner.addSplits(splits);
+
+ Collection<IcebergSourceSplitState> pendingSplits;
+ // Can not discover the new split with planning failures
+ for (int i = 0; i < expectedFailures; ++i) {
+ enumeratorContext.triggerAllActions();
+ pendingSplits = enumerator.snapshotState(i).pendingSplits();
+ Assert.assertEquals(0, pendingSplits.size());
+ }
+
+ // Discovered the new split after a successful scan planning
+ enumeratorContext.triggerAllActions();
+ pendingSplits = enumerator.snapshotState(expectedFailures +
1).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(),
pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED,
pendingSplit.status());
+ }
+
private static ContinuousIcebergEnumerator createEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> context,
ScanContext scanContext,