This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bf00d51e4b Flink: backport PR #10832 of inferring parallelism in 
FLIP-27 source (#11009)
bf00d51e4b is described below

commit bf00d51e4b80b428c44e429c26077b99f7212fdd
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Tue Aug 27 08:33:42 2024 -0700

    Flink: backport PR #10832 of inferring parallelism in FLIP-27 source 
(#11009)
---
 .../apache/iceberg/flink/source/IcebergSource.java |  85 +++++++++-
 .../iceberg/flink/source/IcebergTableSource.java   |  31 ++--
 .../flink/source/TestIcebergSourceBounded.java     |   9 +-
 .../flink/source/TestIcebergSourceBoundedSql.java  |   2 +-
 .../source/TestIcebergSourceInferParallelism.java  | 181 +++++++++++++++++++++
 .../iceberg/flink/source/TestIcebergSourceSql.java |   7 +-
 .../TestIcebergSpeculativeExecutionSupport.java    |  15 +-
 .../apache/iceberg/flink/source/IcebergSource.java |  85 +++++++++-
 .../iceberg/flink/source/IcebergTableSource.java   |  31 ++--
 .../flink/source/TestIcebergSourceBounded.java     |   9 +-
 .../flink/source/TestIcebergSourceBoundedSql.java  |   2 +-
 .../source/TestIcebergSourceInferParallelism.java  | 181 +++++++++++++++++++++
 .../iceberg/flink/source/TestIcebergSourceSql.java |   7 +-
 .../TestIcebergSpeculativeExecutionSupport.java    |  14 +-
 14 files changed, 586 insertions(+), 73 deletions(-)

diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 351ba54e5c..5718f4b938 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -37,6 +39,9 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
@@ -74,6 +79,7 @@ import 
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
 import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.ThreadPools;
@@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   private final SerializableRecordEmitter<T> emitter;
   private final String tableName;
 
+  // cache the discovered splits by planSplitsForBatch, which can be called 
twice. And they come
+  // from two different threads: (1) source/stream construction by main thread 
(2) enumerator
+  // creation. Hence need volatile here.
+  private volatile List<IcebergSourceSplit> batchSplits;
+
   IcebergSource(
       TableLoader tableLoader,
       ScanContext scanContext,
@@ -132,16 +143,26 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     return tableName + "-" + UUID.randomUUID();
   }
 
+  /**
+   * Cache the enumerated splits for batch execution to avoid double planning 
as there are two code
+   * paths obtaining splits: (1) infer parallelism (2) enumerator creation.
+   */
   private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
+    if (batchSplits != null) {
+      return batchSplits;
+    }
+
     ExecutorService workerPool =
         ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
     try (TableLoader loader = tableLoader.clone()) {
       loader.open();
-      List<IcebergSourceSplit> splits =
+      this.batchSplits =
           FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), 
scanContext, workerPool);
       LOG.info(
-          "Discovered {} splits from table {} during job initialization", 
splits.size(), tableName);
-      return splits;
+          "Discovered {} splits from table {} during job initialization",
+          batchSplits.size(),
+          tableName);
+      return batchSplits;
     } catch (IOException e) {
       throw new UncheckedIOException("Failed to close table loader", e);
     } finally {
@@ -207,12 +228,35 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
         // Only do scan planning if nothing is restored from checkpoint state
         List<IcebergSourceSplit> splits = 
planSplitsForBatch(planningThreadName());
         assigner.onDiscoveredSplits(splits);
+        // clear the cached splits after enumerator creation as they won't be 
needed anymore
+        this.batchSplits = null;
       }
 
       return new StaticIcebergEnumerator(enumContext, assigner);
     }
   }
 
+  private boolean shouldInferParallelism() {
+    return !scanContext.isStreaming();
+  }
+
+  private int inferParallelism(ReadableConfig flinkConf, 
StreamExecutionEnvironment env) {
+    int parallelism =
+        SourceUtil.inferParallelism(
+            flinkConf,
+            scanContext.limit(),
+            () -> {
+              List<IcebergSourceSplit> splits = 
planSplitsForBatch(planningThreadName());
+              return splits.size();
+            });
+
+    if (env.getMaxParallelism() > 0) {
+      parallelism = Math.min(parallelism, env.getMaxParallelism());
+    }
+
+    return parallelism;
+  }
+
   /**
    * Create a source builder.
    *
@@ -571,6 +615,41 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
           emitter);
     }
 
+    /**
+     * Build the {@link IcebergSource} and create a {@link DataStream} from 
the source. Watermark
+     * strategy is set to {@link WatermarkStrategy#noWatermarks()}.
+     *
+     * @return data stream from the Iceberg source
+     */
+    public DataStream<T> buildStream(StreamExecutionEnvironment env) {
+      // buildStream should only be called with RowData or Converter paths.
+      Preconditions.checkState(
+          readerFunction == null,
+          "Cannot set reader function when building a data stream from the 
source");
+      IcebergSource<T> source = build();
+      TypeInformation<T> outputTypeInfo =
+          outputTypeInfo(converter, table.schema(), 
source.scanContext.project());
+      DataStreamSource<T> stream =
+          env.fromSource(source, WatermarkStrategy.noWatermarks(), 
source.name(), outputTypeInfo);
+      if (source.shouldInferParallelism()) {
+        stream = stream.setParallelism(source.inferParallelism(flinkConfig, 
env));
+      }
+
+      return stream;
+    }
+
+    private static <T> TypeInformation<T> outputTypeInfo(
+        RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
+      if (converter != null) {
+        return converter.getProducedType();
+      } else {
+        // output type is RowData
+        Schema readSchema = projected != null ? projected : tableSchema;
+        return (TypeInformation<T>)
+            
FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
+      }
+    }
+
     private ReaderFunction<T> readerFunction(ScanContext context) {
       if (table instanceof BaseMetadataTable) {
         MetaDataReaderFunction rowDataReaderFunction =
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 610657e8d4..65adce77d9 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -23,11 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -128,26 +125,18 @@ public class IcebergTableSource
         .build();
   }
 
-  private DataStreamSource<RowData> 
createFLIP27Stream(StreamExecutionEnvironment env) {
+  private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment 
env) {
     SplitAssignerType assignerType =
         readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
-    IcebergSource<RowData> source =
-        IcebergSource.forRowData()
-            .tableLoader(loader)
-            .assignerFactory(assignerType.factory())
-            .properties(properties)
-            .project(getProjectedSchema())
-            .limit(limit)
-            .filters(filters)
-            .flinkConfig(readableConfig)
-            .build();
-    DataStreamSource stream =
-        env.fromSource(
-            source,
-            WatermarkStrategy.noWatermarks(),
-            source.name(),
-            TypeInformation.of(RowData.class));
-    return stream;
+    return IcebergSource.forRowData()
+        .tableLoader(loader)
+        .assignerFactory(assignerType.factory())
+        .properties(properties)
+        .project(getProjectedSchema())
+        .limit(limit)
+        .filters(filters)
+        .flinkConfig(readableConfig)
+        .buildStream(env);
   }
 
   private TableSchema getProjectedSchema() {
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index b7447d15c0..db8647f054 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -24,8 +24,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -130,11 +128,8 @@ public class TestIcebergSourceBounded extends 
TestFlinkScan {
     sourceBuilder.properties(options);
 
     DataStream<Row> stream =
-        env.fromSource(
-                sourceBuilder.build(),
-                WatermarkStrategy.noWatermarks(),
-                "testBasicRead",
-                TypeInformation.of(RowData.class))
+        sourceBuilder
+            .buildStream(env)
             .map(
                 new RowDataToRowMapper(
                     FlinkSchemaUtil.convert(
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
index 0f41c5af4c..d3713e2960 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
@@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends 
TestIcebergSourceBounded {
   @BeforeEach
   public void before() throws IOException {
     Configuration tableConf = getTableEnv().getConfig().getConfiguration();
-    
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
+    tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
     SqlHelpers.sql(
         getTableEnv(),
         "create catalog iceberg_catalog with ('type'='iceberg', 
'catalog-type'='hadoop', 'warehouse'='%s')",
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
new file mode 100644
index 0000000000..2908cb9272
--- /dev/null
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestIcebergSourceInferParallelism {
+  private static final int NUM_TMS = 2;
+  private static final int SLOTS_PER_TM = 2;
+  private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+  private static final int MAX_INFERRED_PARALLELISM = 3;
+
+  @RegisterExtension
+  private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUM_TMS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+              .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .build());
+
+  @RegisterExtension
+  protected static final HadoopCatalogExtension CATALOG_EXTENSION =
+      new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  @TempDir private Path tmpDir;
+
+  private Table table;
+  private GenericAppenderHelper dataAppender;
+
+  @BeforeEach
+  public void before() throws IOException {
+    this.table =
+        CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+    this.dataAppender = new GenericAppenderHelper(table, FileFormat.PARQUET, 
tmpDir);
+  }
+
+  @AfterEach
+  public void after() {
+    CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+  }
+
+  @Test
+  public void testEmptyTable() throws Exception {
+    // Inferred parallelism should be at least 1 even if table is empty
+    test(1, 0);
+  }
+
+  @Test
+  public void testTableWithFilesLessThanMaxInferredParallelism() throws 
Exception {
+    // Append files to the table
+    for (int i = 0; i < 2; ++i) {
+      List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
+      dataAppender.appendToTable(batch);
+    }
+
+    // Inferred parallelism should equal to 2 splits
+    test(2, 2);
+  }
+
+  @Test
+  public void testTableWithFilesMoreThanMaxInferredParallelism() throws 
Exception {
+    // Append files to the table
+    for (int i = 0; i < MAX_INFERRED_PARALLELISM + 1; ++i) {
+      List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
+      dataAppender.appendToTable(batch);
+    }
+
+    // Inferred parallelism should be capped by the MAX_INFERRED_PARALLELISM
+    test(MAX_INFERRED_PARALLELISM, MAX_INFERRED_PARALLELISM + 1);
+  }
+
+  private void test(int expectedParallelism, int expectedRecords) throws 
Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(PARALLELISM);
+
+    Configuration config = new Configuration();
+    config.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, 
true);
+    config.set(
+        FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
+        MAX_INFERRED_PARALLELISM);
+
+    DataStream<Row> dataStream =
+        IcebergSource.forRowData()
+            .tableLoader(CATALOG_EXTENSION.tableLoader())
+            .table(table)
+            .flinkConfig(config)
+            // force one file per split
+            .splitSize(1L)
+            .buildStream(env)
+            .map(new 
RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema())));
+
+    DataStream.Collector<Row> collector = new DataStream.Collector<>();
+    dataStream.collectAsync(collector);
+    JobClient jobClient = env.executeAsync();
+    try (CloseableIterator<Row> iterator = collector.getOutput()) {
+      List<Row> result = Lists.newArrayList();
+      while (iterator.hasNext()) {
+        result.add(iterator.next());
+      }
+
+      assertThat(result).hasSize(expectedRecords);
+      verifySourceParallelism(
+          expectedParallelism, 
miniCluster().getExecutionGraph(jobClient.getJobID()).get());
+    }
+  }
+
+  /**
+   * Borrowed this approach from Flink {@code FileSourceTextLinesITCase} to 
get source parallelism
+   * from execution graph.
+   */
+  private static void verifySourceParallelism(
+      int expectedParallelism, AccessExecutionGraph executionGraph) {
+    AccessExecutionJobVertex sourceVertex =
+        executionGraph.getVerticesTopologically().iterator().next();
+    assertThat(sourceVertex.getParallelism()).isEqualTo(expectedParallelism);
+  }
+
+  /**
+   * Use reflection to get {@code InternalMiniClusterExtension} and {@code 
MiniCluster} to get
+   * execution graph and source parallelism. Haven't find other way via public 
APIS.
+   */
+  private static MiniCluster miniCluster() throws Exception {
+    Field privateField =
+        
MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension");
+    privateField.setAccessible(true);
+    InternalMiniClusterExtension internalExtension =
+        (InternalMiniClusterExtension) 
privateField.get(MINI_CLUSTER_EXTENSION);
+    return internalExtension.getMiniCluster();
+  }
+}
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index 75f0a785a8..548940a842 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -53,7 +53,12 @@ public class TestIcebergSourceSql extends TestSqlBase {
   public void before() throws IOException {
     TableEnvironment tableEnvironment = getTableEnv();
     Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
-    
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
+    tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
+    // Disable inferring parallelism to avoid interfering watermark tests
+    // that check split assignment is ordered by the watermark column.
+    // The tests assumes default parallelism of 1 with single reader task
+    // in order to check the order of read records.
+    
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, 
false);
 
     
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", 
"1");
     SqlHelpers.sql(
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index 51f9025b41..564e8139e6 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -48,14 +48,20 @@ import org.apache.iceberg.flink.TestBase;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+/**
+ * There is a infinite sleep in the test. Add a timeout to the test to avoid 
stuck situation in case
+ * anything goes wrong unexpectedly.
+ */
+@Timeout(value = 60)
 public class TestIcebergSpeculativeExecutionSupport extends TestBase {
   private static final int NUM_TASK_MANAGERS = 1;
   private static final int NUM_TASK_SLOTS = 3;
 
   @RegisterExtension
-  public static MiniClusterExtension miniClusterResource =
+  public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
       new MiniClusterExtension(
           new MiniClusterResourceConfiguration.Builder()
               .setNumberTaskManagers(NUM_TASK_MANAGERS)
@@ -144,9 +150,9 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   private static class TestingMap extends RichMapFunction<Row, Row> {
     @Override
     public Row map(Row row) throws Exception {
-      // Put the subtasks with the first attempt to sleep to trigger 
speculative
-      // execution
-      if (getRuntimeContext().getAttemptNumber() <= 0) {
+      // Simulate slow subtask 0 with attempt 0
+      if (getRuntimeContext().getIndexOfThisSubtask() == 0
+          && getRuntimeContext().getAttemptNumber() <= 0) {
         Thread.sleep(Integer.MAX_VALUE);
       }
 
@@ -169,6 +175,7 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
 
     // Use FLIP-27 source
     configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
+    
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
 false);
 
     // for speculative execution
     configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 351ba54e5c..5718f4b938 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -37,6 +39,9 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
@@ -74,6 +79,7 @@ import 
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
 import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.ThreadPools;
@@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   private final SerializableRecordEmitter<T> emitter;
   private final String tableName;
 
+  // cache the discovered splits by planSplitsForBatch, which can be called 
twice. And they come
+  // from two different threads: (1) source/stream construction by main thread 
(2) enumerator
+  // creation. Hence need volatile here.
+  private volatile List<IcebergSourceSplit> batchSplits;
+
   IcebergSource(
       TableLoader tableLoader,
       ScanContext scanContext,
@@ -132,16 +143,26 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     return tableName + "-" + UUID.randomUUID();
   }
 
+  /**
+   * Cache the enumerated splits for batch execution to avoid double planning 
as there are two code
+   * paths obtaining splits: (1) infer parallelism (2) enumerator creation.
+   */
   private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
+    if (batchSplits != null) {
+      return batchSplits;
+    }
+
     ExecutorService workerPool =
         ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
     try (TableLoader loader = tableLoader.clone()) {
       loader.open();
-      List<IcebergSourceSplit> splits =
+      this.batchSplits =
           FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), 
scanContext, workerPool);
       LOG.info(
-          "Discovered {} splits from table {} during job initialization", 
splits.size(), tableName);
-      return splits;
+          "Discovered {} splits from table {} during job initialization",
+          batchSplits.size(),
+          tableName);
+      return batchSplits;
     } catch (IOException e) {
       throw new UncheckedIOException("Failed to close table loader", e);
     } finally {
@@ -207,12 +228,35 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
         // Only do scan planning if nothing is restored from checkpoint state
         List<IcebergSourceSplit> splits = 
planSplitsForBatch(planningThreadName());
         assigner.onDiscoveredSplits(splits);
+        // clear the cached splits after enumerator creation as they won't be 
needed anymore
+        this.batchSplits = null;
       }
 
       return new StaticIcebergEnumerator(enumContext, assigner);
     }
   }
 
+  private boolean shouldInferParallelism() {
+    return !scanContext.isStreaming();
+  }
+
+  private int inferParallelism(ReadableConfig flinkConf, 
StreamExecutionEnvironment env) {
+    int parallelism =
+        SourceUtil.inferParallelism(
+            flinkConf,
+            scanContext.limit(),
+            () -> {
+              List<IcebergSourceSplit> splits = 
planSplitsForBatch(planningThreadName());
+              return splits.size();
+            });
+
+    if (env.getMaxParallelism() > 0) {
+      parallelism = Math.min(parallelism, env.getMaxParallelism());
+    }
+
+    return parallelism;
+  }
+
   /**
    * Create a source builder.
    *
@@ -571,6 +615,41 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
           emitter);
     }
 
+    /**
+     * Build the {@link IcebergSource} and create a {@link DataStream} from 
the source. Watermark
+     * strategy is set to {@link WatermarkStrategy#noWatermarks()}.
+     *
+     * @return data stream from the Iceberg source
+     */
+    public DataStream<T> buildStream(StreamExecutionEnvironment env) {
+      // buildStream should only be called with RowData or Converter paths.
+      Preconditions.checkState(
+          readerFunction == null,
+          "Cannot set reader function when building a data stream from the 
source");
+      IcebergSource<T> source = build();
+      TypeInformation<T> outputTypeInfo =
+          outputTypeInfo(converter, table.schema(), 
source.scanContext.project());
+      DataStreamSource<T> stream =
+          env.fromSource(source, WatermarkStrategy.noWatermarks(), 
source.name(), outputTypeInfo);
+      if (source.shouldInferParallelism()) {
+        stream = stream.setParallelism(source.inferParallelism(flinkConfig, 
env));
+      }
+
+      return stream;
+    }
+
+    private static <T> TypeInformation<T> outputTypeInfo(
+        RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
+      if (converter != null) {
+        return converter.getProducedType();
+      } else {
+        // output type is RowData
+        Schema readSchema = projected != null ? projected : tableSchema;
+        return (TypeInformation<T>)
+            
FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
+      }
+    }
+
     private ReaderFunction<T> readerFunction(ScanContext context) {
       if (table instanceof BaseMetadataTable) {
         MetaDataReaderFunction rowDataReaderFunction =
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 610657e8d4..65adce77d9 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -23,11 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -128,26 +125,18 @@ public class IcebergTableSource
         .build();
   }
 
-  private DataStreamSource<RowData> 
createFLIP27Stream(StreamExecutionEnvironment env) {
+  private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment 
env) {
     SplitAssignerType assignerType =
         readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
-    IcebergSource<RowData> source =
-        IcebergSource.forRowData()
-            .tableLoader(loader)
-            .assignerFactory(assignerType.factory())
-            .properties(properties)
-            .project(getProjectedSchema())
-            .limit(limit)
-            .filters(filters)
-            .flinkConfig(readableConfig)
-            .build();
-    DataStreamSource stream =
-        env.fromSource(
-            source,
-            WatermarkStrategy.noWatermarks(),
-            source.name(),
-            TypeInformation.of(RowData.class));
-    return stream;
+    return IcebergSource.forRowData()
+        .tableLoader(loader)
+        .assignerFactory(assignerType.factory())
+        .properties(properties)
+        .project(getProjectedSchema())
+        .limit(limit)
+        .filters(filters)
+        .flinkConfig(readableConfig)
+        .buildStream(env);
   }
 
   private TableSchema getProjectedSchema() {
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index b7447d15c0..db8647f054 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -24,8 +24,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -130,11 +128,8 @@ public class TestIcebergSourceBounded extends 
TestFlinkScan {
     sourceBuilder.properties(options);
 
     DataStream<Row> stream =
-        env.fromSource(
-                sourceBuilder.build(),
-                WatermarkStrategy.noWatermarks(),
-                "testBasicRead",
-                TypeInformation.of(RowData.class))
+        sourceBuilder
+            .buildStream(env)
             .map(
                 new RowDataToRowMapper(
                     FlinkSchemaUtil.convert(
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
index 0f41c5af4c..d3713e2960 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
@@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends 
TestIcebergSourceBounded {
   @BeforeEach
   public void before() throws IOException {
     Configuration tableConf = getTableEnv().getConfig().getConfiguration();
-    
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
+    tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
     SqlHelpers.sql(
         getTableEnv(),
         "create catalog iceberg_catalog with ('type'='iceberg', 
'catalog-type'='hadoop', 'warehouse'='%s')",
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
new file mode 100644
index 0000000000..2908cb9272
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestIcebergSourceInferParallelism {
+  private static final int NUM_TMS = 2;
+  private static final int SLOTS_PER_TM = 2;
+  private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+  private static final int MAX_INFERRED_PARALLELISM = 3;
+
+  @RegisterExtension
+  private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUM_TMS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+              .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .build());
+
+  @RegisterExtension
+  protected static final HadoopCatalogExtension CATALOG_EXTENSION =
+      new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  @TempDir private Path tmpDir;
+
+  private Table table;
+  private GenericAppenderHelper dataAppender;
+
+  @BeforeEach
+  public void before() throws IOException {
+    this.table =
+        CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+    this.dataAppender = new GenericAppenderHelper(table, FileFormat.PARQUET, 
tmpDir);
+  }
+
+  @AfterEach
+  public void after() {
+    CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+  }
+
+  @Test
+  public void testEmptyTable() throws Exception {
+    // Inferred parallelism should be at least 1 even if table is empty
+    test(1, 0);
+  }
+
+  @Test
+  public void testTableWithFilesLessThanMaxInferredParallelism() throws 
Exception {
+    // Append files to the table
+    for (int i = 0; i < 2; ++i) {
+      List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
+      dataAppender.appendToTable(batch);
+    }
+
+    // Inferred parallelism should equal to 2 splits
+    test(2, 2);
+  }
+
+  @Test
+  public void testTableWithFilesMoreThanMaxInferredParallelism() throws 
Exception {
+    // Append files to the table
+    for (int i = 0; i < MAX_INFERRED_PARALLELISM + 1; ++i) {
+      List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
+      dataAppender.appendToTable(batch);
+    }
+
+    // Inferred parallelism should be capped by the MAX_INFERRED_PARALLELISM
+    test(MAX_INFERRED_PARALLELISM, MAX_INFERRED_PARALLELISM + 1);
+  }
+
+  private void test(int expectedParallelism, int expectedRecords) throws 
Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(PARALLELISM);
+
+    Configuration config = new Configuration();
+    config.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, 
true);
+    config.set(
+        FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
+        MAX_INFERRED_PARALLELISM);
+
+    DataStream<Row> dataStream =
+        IcebergSource.forRowData()
+            .tableLoader(CATALOG_EXTENSION.tableLoader())
+            .table(table)
+            .flinkConfig(config)
+            // force one file per split
+            .splitSize(1L)
+            .buildStream(env)
+            .map(new 
RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema())));
+
+    DataStream.Collector<Row> collector = new DataStream.Collector<>();
+    dataStream.collectAsync(collector);
+    JobClient jobClient = env.executeAsync();
+    try (CloseableIterator<Row> iterator = collector.getOutput()) {
+      List<Row> result = Lists.newArrayList();
+      while (iterator.hasNext()) {
+        result.add(iterator.next());
+      }
+
+      assertThat(result).hasSize(expectedRecords);
+      verifySourceParallelism(
+          expectedParallelism, 
miniCluster().getExecutionGraph(jobClient.getJobID()).get());
+    }
+  }
+
+  /**
+   * Borrowed this approach from Flink {@code FileSourceTextLinesITCase} to 
get source parallelism
+   * from execution graph.
+   */
+  private static void verifySourceParallelism(
+      int expectedParallelism, AccessExecutionGraph executionGraph) {
+    AccessExecutionJobVertex sourceVertex =
+        executionGraph.getVerticesTopologically().iterator().next();
+    assertThat(sourceVertex.getParallelism()).isEqualTo(expectedParallelism);
+  }
+
+  /**
+   * Use reflection to get {@code InternalMiniClusterExtension} and {@code 
MiniCluster} to get
+   * execution graph and source parallelism. Haven't find other way via public 
APIS.
+   */
+  private static MiniCluster miniCluster() throws Exception {
+    Field privateField =
+        
MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension");
+    privateField.setAccessible(true);
+    InternalMiniClusterExtension internalExtension =
+        (InternalMiniClusterExtension) 
privateField.get(MINI_CLUSTER_EXTENSION);
+    return internalExtension.getMiniCluster();
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index 75f0a785a8..548940a842 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -53,7 +53,12 @@ public class TestIcebergSourceSql extends TestSqlBase {
   public void before() throws IOException {
     TableEnvironment tableEnvironment = getTableEnv();
     Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
-    
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
+    tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
+    // Disable inferring parallelism to avoid interfering watermark tests
+    // that check split assignment is ordered by the watermark column.
+    // The tests assumes default parallelism of 1 with single reader task
+    // in order to check the order of read records.
+    
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, 
false);
 
     
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", 
"1");
     SqlHelpers.sql(
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index 41b023b936..05a08c24d8 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.BatchExecutionOptions;
@@ -48,8 +49,14 @@ import org.apache.iceberg.flink.TestBase;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+/**
+ * There is a infinite sleep in the test. Add a timeout to the test to avoid 
stuck situation in case
+ * anything goes wrong unexpectedly.
+ */
+@Timeout(value = 60)
 public class TestIcebergSpeculativeExecutionSupport extends TestBase {
   private static final int NUM_TASK_MANAGERS = 1;
   private static final int NUM_TASK_SLOTS = 3;
@@ -144,9 +151,9 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
   private static class TestingMap extends RichMapFunction<Row, Row> {
     @Override
     public Row map(Row row) throws Exception {
-      // Put the subtasks with the first attempt to sleep to trigger 
speculative
-      // execution
-      if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
+      // Simulate slow subtask 0 with attempt 0
+      TaskInfo taskInfo = getRuntimeContext().getTaskInfo();
+      if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber() 
<= 0) {
         Thread.sleep(Integer.MAX_VALUE);
       }
 
@@ -169,6 +176,7 @@ public class TestIcebergSpeculativeExecutionSupport extends 
TestBase {
 
     // Use FLIP-27 source
     configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
+    
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
 false);
 
     // for speculative execution
     configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);


Reply via email to