This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cbde142f7 Flink: Add config for max allowed consecutive planning 
failures in IcebergSource before failing the job (#7571)
7cbde142f7 is described below

commit 7cbde142f71236ed71a2cb036db46e7e7672e3de
Author: pvary <[email protected]>
AuthorDate: Wed May 17 04:52:37 2023 +0200

    Flink: Add config for max allowed consecutive planning failures in 
IcebergSource before failing the job (#7571)
---
 docs/flink-configuration.md                        |  45 ++++----
 .../org/apache/iceberg/flink/FlinkReadConf.java    |   9 ++
 .../org/apache/iceberg/flink/FlinkReadOptions.java |   4 +
 .../apache/iceberg/flink/source/IcebergSource.java |   7 ++
 .../apache/iceberg/flink/source/ScanContext.java   |  24 ++++-
 .../enumerator/ContinuousIcebergEnumerator.java    |  12 ++-
 .../enumerator/ManualContinuousSplitPlanner.java   |   9 +-
 .../TestContinuousIcebergEnumerator.java           | 115 ++++++++++++++++++++-
 8 files changed, 196 insertions(+), 29 deletions(-)

diff --git a/docs/flink-configuration.md b/docs/flink-configuration.md
index 8e3494ab93..37db16c363 100644
--- a/docs/flink-configuration.md
+++ b/docs/flink-configuration.md
@@ -111,27 +111,28 @@ env.getConfig()
 
 `Read option` has the highest priority, followed by `Flink configuration` and 
then `Table property`.
 
-| Read option                 | Flink configuration                           
| Table property               | Default                          | Description 
                                                 |
-| --------------------------- | --------------------------------------------- 
| ---------------------------- | -------------------------------- | 
------------------------------------------------------------ |
-| snapshot-id                 | N/A                                           
| N/A                          | null                             | For time 
travel in batch mode. Read data from the specified snapshot-id. |
-| case-sensitive              | connector.iceberg.case-sensitive              
| N/A                          | false                            | If true, 
match column name in a case sensitive way.          |
-| as-of-timestamp             | N/A                                           
| N/A                          | null                             | For time 
travel in batch mode. Read data from the most recent snapshot as of the given 
time in milliseconds. |
-| starting-strategy           | connector.iceberg.starting-strategy           
| N/A                          | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting 
strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular 
table scan then switch to the incremental mode. The incremental mode starts 
from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start 
incremental mode from the latest snapshot inclusive. If it is an empty map, all 
future append snapshots shou [...]
-| start-snapshot-timestamp    | N/A                                           
| N/A                          | null                             | Start to 
read data from the most recent snapshot as of the given time in milliseconds. |
-| start-snapshot-id           | N/A                                           
| N/A                          | null                             | Start to 
read data from the specified snapshot-id.           |
-| end-snapshot-id             | N/A                                           
| N/A                          | The latest snapshot id           | Specifies 
the end snapshot.  
-| branch                     | N/A                                            
| N/A             | main       | Specifies the branch to read from in batch mode
-| tag                        | N/A                                            
| N/A             | null       | Specifies the tag to read from in batch mode
-| start-tag                  | N/A                                            
| N/A             | null       | Specifies the starting tag to read from for 
incremental reads
-| end-tag                    | N/A                                            
| N/A             | null       | Specifies the ending tag to to read from for 
incremental reads                                |
-| split-size                  | connector.iceberg.split-size                  
| read.split.target-size       | 128 MB                           | Target size 
when combining input splits.                     |
-| split-lookback              | connector.iceberg.split-file-open-cost        
| read.split.planning-lookback | 10                               | Number of 
bins to consider when combining input splits.      |
-| split-file-open-cost        | connector.iceberg.split-file-open-cost        
| read.split.open-file-cost    | 4MB                              | The 
estimated cost to open a file, used as a minimum weight when combining splits. |
-| streaming                   | connector.iceberg.streaming                   
| N/A                          | false                            | Sets 
whether the current task runs in streaming or batch mode. |
-| monitor-interval            | connector.iceberg.monitor-interval            
| N/A                          | 60s                              | Monitor 
interval to discover splits from new snapshots. Applicable only for streaming 
read. |
-| include-column-stats        | connector.iceberg.include-column-stats        
| N/A                          | false                            | Create a 
new scan from this that loads the column stats with each data file. Column 
stats include: value count, null value count, lower bounds, and upper bounds. |
-| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count 
| N/A                          | Integer.MAX_VALUE                | Max number 
of snapshots limited per split enumeration. Applicable only to streaming read. |
-| limit                       | connector.iceberg.limit                       
| N/A                          | -1                               | Limited 
output number of rows.                               |
+| Read option                   | Flink configuration                          
   | Table property               | Default                          | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                       [...]
+|-------------------------------|-------------------------------------------------|------------------------------|----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| snapshot-id                   | N/A                                          
   | N/A                          | null                             | For time 
travel in batch mode. Read data from the specified snapshot-id.                 
                                                                                
                                                                                
                                                                                
              [...]
+| case-sensitive                | connector.iceberg.case-sensitive             
   | N/A                          | false                            | If true, 
match column name in a case sensitive way.                                      
                                                                                
                                                                                
                                                                                
              [...]
+| as-of-timestamp               | N/A                                          
   | N/A                          | null                             | For time 
travel in batch mode. Read data from the most recent snapshot as of the given 
time in milliseconds.                                                           
                                                                                
                                                                                
                [...]
+| starting-strategy             | connector.iceberg.starting-strategy          
   | N/A                          | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting 
strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular 
table scan then switch to the incremental mode. The incremental mode starts 
from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start 
incremental mode from the latest snapshot inclusive. If it is an empty map, all 
future append snapshots  [...]
+| start-snapshot-timestamp      | N/A                                          
   | N/A                          | null                             | Start to 
read data from the most recent snapshot as of the given time in milliseconds.   
                                                                                
                                                                                
                                                                                
              [...]
+| start-snapshot-id             | N/A                                          
   | N/A                          | null                             | Start to 
read data from the specified snapshot-id.                                       
                                                                                
                                                                                
                                                                                
              [...]
+| end-snapshot-id               | N/A                                          
   | N/A                          | The latest snapshot id           | 
Specifies the end snapshot.                                                     
                                                                                
                                                                                
                                                                                
                       [...]
+| branch                        | N/A                                          
   | N/A                          | main                             | 
Specifies the branch to read from in batch mode                                 
                                                                                
                                                                                
                                                                                
                       [...]
+| tag                           | N/A                                          
   | N/A                          | null                             | 
Specifies the tag to read from in batch mode                                    
                                                                                
                                                                                
                                                                                
                       [...]
+| start-tag                     | N/A                                          
   | N/A                          | null                             | 
Specifies the starting tag to read from for incremental reads                   
                                                                                
                                                                                
                                                                                
                       [...]
+| end-tag                       | N/A                                          
   | N/A                          | null                             | 
Specifies the ending tag to to read from for incremental reads                  
                                                                                
                                                                                
                                                                                
                       [...]
+| split-size                    | connector.iceberg.split-size                 
   | read.split.target-size       | 128 MB                           | Target 
size when combining input splits.                                               
                                                                                
                                                                                
                                                                                
                [...]
+| split-lookback                | connector.iceberg.split-file-open-cost       
   | read.split.planning-lookback | 10                               | Number 
of bins to consider when combining input splits.                                
                                                                                
                                                                                
                                                                                
                [...]
+| split-file-open-cost          | connector.iceberg.split-file-open-cost       
   | read.split.open-file-cost    | 4MB                              | The 
estimated cost to open a file, used as a minimum weight when combining splits.  
                                                                                
                                                                                
                                                                                
                   [...]
+| streaming                     | connector.iceberg.streaming                  
   | N/A                          | false                            | Sets 
whether the current task runs in streaming or batch mode.                       
                                                                                
                                                                                
                                                                                
                  [...]
+| monitor-interval              | connector.iceberg.monitor-interval           
   | N/A                          | 60s                              | Monitor 
interval to discover splits from new snapshots. Applicable only for streaming 
read.                                                                           
                                                                                
                                                                                
                 [...]
+| include-column-stats          | connector.iceberg.include-column-stats       
   | N/A                          | false                            | Create a 
new scan from this that loads the column stats with each data file. Column 
stats include: value count, null value count, lower bounds, and upper bounds.   
                                                                                
                                                                                
                   [...]
+| max-planning-snapshot-count   | 
connector.iceberg.max-planning-snapshot-count   | N/A                          
| Integer.MAX_VALUE                | Max number of snapshots limited per split 
enumeration. Applicable only to streaming read.                                 
                                                                                
                                                                                
                                                             [...]
+| limit                         | connector.iceberg.limit                      
   | N/A                          | -1                               | Limited 
output number of rows.                                                          
                                                                                
                                                                                
                                                                                
               [...]
+| max-allowed-planning-failures | 
connector.iceberg.max-allowed-planning-failures | N/A                          
| 3                                | Max allowed consecutive failures for scan 
planning before failing the job. Set to -1 for never failing the job for scan 
planing failure.                                                                
                                                                                
                                                               [...]
 
 
 ### Write options
@@ -163,4 +164,4 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') 
*/
 | compression-codec      | Table write.(fileformat).compression-codec | 
Overrides this table's compression codec for this write      |
 | compression-level      | Table write.(fileformat).compression-level | 
Overrides this table's compression level for Parquet and Avro tables for this 
write |
 | compression-strategy   | Table write.orc.compression-strategy       | 
Overrides this table's compression strategy for ORC tables for this write |
-| write-parallelism      | Upstream operator parallelism              | 
Overrides the writer parallelism                             |
\ No newline at end of file
+| write-parallelism      | Upstream operator parallelism              | 
Overrides the writer parallelism                             |
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index baef57a8e7..0e04c9affb 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -181,4 +181,13 @@ public class FlinkReadConf {
         
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
         .parse();
   }
+
+  public int maxAllowedPlanningFailures() {
+    return confParser
+        .intConf()
+        .option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
+        .flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
+        
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
+        .parse();
+  }
 }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index d75b2234d7..55c5aca3b6 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -105,4 +105,8 @@ public class FlinkReadOptions {
   public static final String LIMIT = "limit";
   public static final ConfigOption<Long> LIMIT_OPTION =
       ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+  public static final String MAX_ALLOWED_PLANNING_FAILURES = 
"max-allowed-planning-failures";
+  public static final ConfigOption<Integer> 
MAX_ALLOWED_PLANNING_FAILURES_OPTION =
+      ConfigOptions.key(PREFIX + 
MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
 }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0675305e10..cbdd184870 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -391,6 +391,13 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    public Builder<T> maxAllowedPlanningFailures(int 
maxAllowedPlanningFailures) {
+      readOptions.put(
+          FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
+          Integer.toString(maxAllowedPlanningFailures));
+      return this;
+    }
+
     /**
      * Set the read properties for Flink source. View the supported properties 
in {@link
      * FlinkReadOptions}
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 23f33e6d2e..e380204e87 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -64,6 +64,7 @@ public class ScanContext implements Serializable {
   private final boolean includeColumnStats;
   private final Integer planParallelism;
   private final int maxPlanningSnapshotCount;
+  private final int maxAllowedPlanningFailures;
 
   private ScanContext(
       boolean caseSensitive,
@@ -86,6 +87,7 @@ public class ScanContext implements Serializable {
       boolean exposeLocality,
       Integer planParallelism,
       int maxPlanningSnapshotCount,
+      int maxAllowedPlanningFailures,
       String branch,
       String tag,
       String startTag,
@@ -115,6 +117,7 @@ public class ScanContext implements Serializable {
     this.exposeLocality = exposeLocality;
     this.planParallelism = planParallelism;
     this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
+    this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
 
     validate();
   }
@@ -155,6 +158,10 @@ public class ScanContext implements Serializable {
     Preconditions.checkArgument(
         !(endTag != null && endSnapshotId() != null),
         "END_SNAPSHOT_ID and END_TAG cannot both be set.");
+
+    Preconditions.checkArgument(
+        maxAllowedPlanningFailures >= -1,
+        "Cannot set maxAllowedPlanningFailures to a negative number other than 
-1.");
   }
 
   public boolean caseSensitive() {
@@ -253,6 +260,10 @@ public class ScanContext implements Serializable {
     return maxPlanningSnapshotCount;
   }
 
+  public int maxAllowedPlanningFailures() {
+    return maxAllowedPlanningFailures;
+  }
+
   public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long 
newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
@@ -277,6 +288,7 @@ public class ScanContext implements Serializable {
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+        .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
         .build();
   }
 
@@ -304,6 +316,7 @@ public class ScanContext implements Serializable {
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+        .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
         .build();
   }
 
@@ -341,6 +354,8 @@ public class ScanContext implements Serializable {
         FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
     private int maxPlanningSnapshotCount =
         FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
+    private int maxAllowedPlanningFailures =
+        FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
 
     private Builder() {}
 
@@ -464,6 +479,11 @@ public class ScanContext implements Serializable {
       return this;
     }
 
+    public Builder maxAllowedPlanningFailures(int 
newMaxAllowedPlanningFailures) {
+      this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
+      return this;
+    }
+
     public Builder resolveConfig(
         Table table, Map<String, String> readOptions, ReadableConfig 
readableConfig) {
       FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, 
readableConfig);
@@ -488,7 +508,8 @@ public class ScanContext implements Serializable {
           .limit(flinkReadConf.limit())
           .planParallelism(flinkReadConf.workerPoolSize())
           .includeColumnStats(flinkReadConf.includeColumnStats())
-          .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
+          .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
+          .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
     }
 
     public ScanContext build() {
@@ -513,6 +534,7 @@ public class ScanContext implements Serializable {
           exposeLocality,
           planParallelism,
           maxPlanningSnapshotCount,
+          maxAllowedPlanningFailures,
           branch,
           tag,
           startTag,
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index b84dab190a..b1dadfb9a6 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -55,6 +55,9 @@ public class ContinuousIcebergEnumerator extends 
AbstractIcebergEnumerator {
   /** Track enumeration result history for split discovery throttling. */
   private final EnumerationHistory enumerationHistory;
 
+  /** Count the consecutive failures and throw exception if the max allowed 
failres are reached */
+  private transient int consecutiveFailures = 0;
+
   public ContinuousIcebergEnumerator(
       SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
       SplitAssigner assigner,
@@ -122,6 +125,7 @@ public class ContinuousIcebergEnumerator extends 
AbstractIcebergEnumerator {
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, 
Throwable error) {
     if (error == null) {
+      consecutiveFailures = 0;
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting 
snapshot to the I/O
         // thread pool. E.g., the splitDiscoveryInterval is very short (like 
10 ms in some unit
@@ -161,7 +165,13 @@ public class ContinuousIcebergEnumerator extends 
AbstractIcebergEnumerator {
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() < 0
+          || consecutiveFailures <= scanContext.maxAllowedPlanningFailures()) {
+        LOG.error("Failed to discover new splits", error);
+      } else {
+        throw new RuntimeException("Failed to discover new splits", error);
+      }
     }
   }
 }
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
index 7575beed6e..ebc92df023 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -32,16 +32,23 @@ class ManualContinuousSplitPlanner implements 
ContinuousSplitPlanner {
   // track splits per snapshot
   private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
   private long latestSnapshotId;
+  private int remainingFailures;
 
-  ManualContinuousSplitPlanner(ScanContext scanContext) {
+  ManualContinuousSplitPlanner(ScanContext scanContext, int expectedFailures) {
     this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
     this.splits = new TreeMap<>();
     this.latestSnapshotId = 0L;
+    this.remainingFailures = expectedFailures;
   }
 
   @Override
   public synchronized ContinuousEnumerationResult planSplits(
       IcebergEnumeratorPosition lastPosition) {
+    if (remainingFailures > 0) {
+      remainingFailures--;
+      throw new RuntimeException("Expected failure at planning");
+    }
+
     long fromSnapshotIdExclusive = 0;
     if (lastPosition != null && lastPosition.snapshotId() != null) {
       fromSnapshotIdExclusive = lastPosition.snapshotId();
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index a051a4de0f..d0ae8fdf77 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -51,7 +51,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -81,7 +81,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -110,7 +110,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -163,7 +163,7 @@ public class TestContinuousIcebergEnumerator {
             // discover one snapshot at a time
             .maxPlanningSnapshotCount(1)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -227,6 +227,113 @@ public class TestContinuousIcebergEnumerator {
         splits.subList(0, 3), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningErrorsWithSuccessfulRetry() throws 
Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that no splits returned due to the 
planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Second scan planning should succeed and discover the expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = 
enumerator.snapshotState(3).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), 
pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, 
pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningErrors() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext, 2);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and 
continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the 
failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> 
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testPlanningIgnoringErrors() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Can not discover the new split with planning failures
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(i).pendingSplits();
+      Assert.assertEquals(0, pendingSplits.size());
+    }
+
+    // Discovered the new split after a successful scan planning
+    enumeratorContext.triggerAllActions();
+    pendingSplits = enumerator.snapshotState(expectedFailures + 
1).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), 
pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, 
pendingSplit.status());
+  }
+
   private static ContinuousIcebergEnumerator createEnumerator(
       SplitEnumeratorContext<IcebergSourceSplit> context,
       ScanContext scanContext,


Reply via email to