This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bf00d51e4b Flink: backport PR #10832 of inferring parallelism in
FLIP-27 source (#11009)
bf00d51e4b is described below
commit bf00d51e4b80b428c44e429c26077b99f7212fdd
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Tue Aug 27 08:33:42 2024 -0700
Flink: backport PR #10832 of inferring parallelism in FLIP-27 source
(#11009)
---
.../apache/iceberg/flink/source/IcebergSource.java | 85 +++++++++-
.../iceberg/flink/source/IcebergTableSource.java | 31 ++--
.../flink/source/TestIcebergSourceBounded.java | 9 +-
.../flink/source/TestIcebergSourceBoundedSql.java | 2 +-
.../source/TestIcebergSourceInferParallelism.java | 181 +++++++++++++++++++++
.../iceberg/flink/source/TestIcebergSourceSql.java | 7 +-
.../TestIcebergSpeculativeExecutionSupport.java | 15 +-
.../apache/iceberg/flink/source/IcebergSource.java | 85 +++++++++-
.../iceberg/flink/source/IcebergTableSource.java | 31 ++--
.../flink/source/TestIcebergSourceBounded.java | 9 +-
.../flink/source/TestIcebergSourceBoundedSql.java | 2 +-
.../source/TestIcebergSourceInferParallelism.java | 181 +++++++++++++++++++++
.../iceberg/flink/source/TestIcebergSourceSql.java | 7 +-
.../TestIcebergSpeculativeExecutionSupport.java | 14 +-
14 files changed, 586 insertions(+), 73 deletions(-)
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 351ba54e5c..5718f4b938 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
@@ -37,6 +39,9 @@ import
org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
@@ -74,6 +79,7 @@ import
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
@@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private final SerializableRecordEmitter<T> emitter;
private final String tableName;
+ // cache the discovered splits by planSplitsForBatch, which can be called
twice. And they come
+ // from two different threads: (1) source/stream construction by main thread
(2) enumerator
+ // creation. Hence need volatile here.
+ private volatile List<IcebergSourceSplit> batchSplits;
+
IcebergSource(
TableLoader tableLoader,
ScanContext scanContext,
@@ -132,16 +143,26 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return tableName + "-" + UUID.randomUUID();
}
+ /**
+ * Cache the enumerated splits for batch execution to avoid double planning
as there are two code
+ * paths obtaining splits: (1) infer parallelism (2) enumerator creation.
+ */
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
+ if (batchSplits != null) {
+ return batchSplits;
+ }
+
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try (TableLoader loader = tableLoader.clone()) {
loader.open();
- List<IcebergSourceSplit> splits =
+ this.batchSplits =
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(),
scanContext, workerPool);
LOG.info(
- "Discovered {} splits from table {} during job initialization",
splits.size(), tableName);
- return splits;
+ "Discovered {} splits from table {} during job initialization",
+ batchSplits.size(),
+ tableName);
+ return batchSplits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
@@ -207,12 +228,35 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
// Only do scan planning if nothing is restored from checkpoint state
List<IcebergSourceSplit> splits =
planSplitsForBatch(planningThreadName());
assigner.onDiscoveredSplits(splits);
+ // clear the cached splits after enumerator creation as they won't be
needed anymore
+ this.batchSplits = null;
}
return new StaticIcebergEnumerator(enumContext, assigner);
}
}
+ private boolean shouldInferParallelism() {
+ return !scanContext.isStreaming();
+ }
+
+ private int inferParallelism(ReadableConfig flinkConf,
StreamExecutionEnvironment env) {
+ int parallelism =
+ SourceUtil.inferParallelism(
+ flinkConf,
+ scanContext.limit(),
+ () -> {
+ List<IcebergSourceSplit> splits =
planSplitsForBatch(planningThreadName());
+ return splits.size();
+ });
+
+ if (env.getMaxParallelism() > 0) {
+ parallelism = Math.min(parallelism, env.getMaxParallelism());
+ }
+
+ return parallelism;
+ }
+
/**
* Create a source builder.
*
@@ -571,6 +615,41 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
emitter);
}
+ /**
+ * Build the {@link IcebergSource} and create a {@link DataStream} from
the source. Watermark
+ * strategy is set to {@link WatermarkStrategy#noWatermarks()}.
+ *
+ * @return data stream from the Iceberg source
+ */
+ public DataStream<T> buildStream(StreamExecutionEnvironment env) {
+ // buildStream should only be called with RowData or Converter paths.
+ Preconditions.checkState(
+ readerFunction == null,
+ "Cannot set reader function when building a data stream from the
source");
+ IcebergSource<T> source = build();
+ TypeInformation<T> outputTypeInfo =
+ outputTypeInfo(converter, table.schema(),
source.scanContext.project());
+ DataStreamSource<T> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
source.name(), outputTypeInfo);
+ if (source.shouldInferParallelism()) {
+ stream = stream.setParallelism(source.inferParallelism(flinkConfig,
env));
+ }
+
+ return stream;
+ }
+
+ private static <T> TypeInformation<T> outputTypeInfo(
+ RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
+ if (converter != null) {
+ return converter.getProducedType();
+ } else {
+ // output type is RowData
+ Schema readSchema = projected != null ? projected : tableSchema;
+ return (TypeInformation<T>)
+
FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
+ }
+ }
+
private ReaderFunction<T> readerFunction(ScanContext context) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 610657e8d4..65adce77d9 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -23,11 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
@@ -128,26 +125,18 @@ public class IcebergTableSource
.build();
}
- private DataStreamSource<RowData>
createFLIP27Stream(StreamExecutionEnvironment env) {
+ private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment
env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
- IcebergSource<RowData> source =
- IcebergSource.forRowData()
- .tableLoader(loader)
- .assignerFactory(assignerType.factory())
- .properties(properties)
- .project(getProjectedSchema())
- .limit(limit)
- .filters(filters)
- .flinkConfig(readableConfig)
- .build();
- DataStreamSource stream =
- env.fromSource(
- source,
- WatermarkStrategy.noWatermarks(),
- source.name(),
- TypeInformation.of(RowData.class));
- return stream;
+ return IcebergSource.forRowData()
+ .tableLoader(loader)
+ .assignerFactory(assignerType.factory())
+ .properties(properties)
+ .project(getProjectedSchema())
+ .limit(limit)
+ .filters(filters)
+ .flinkConfig(readableConfig)
+ .buildStream(env);
}
private TableSchema getProjectedSchema() {
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index b7447d15c0..db8647f054 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -24,8 +24,6 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -130,11 +128,8 @@ public class TestIcebergSourceBounded extends
TestFlinkScan {
sourceBuilder.properties(options);
DataStream<Row> stream =
- env.fromSource(
- sourceBuilder.build(),
- WatermarkStrategy.noWatermarks(),
- "testBasicRead",
- TypeInformation.of(RowData.class))
+ sourceBuilder
+ .buildStream(env)
.map(
new RowDataToRowMapper(
FlinkSchemaUtil.convert(
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
index 0f41c5af4c..d3713e2960 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
@@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends
TestIcebergSourceBounded {
@BeforeEach
public void before() throws IOException {
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
-
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
true);
+ tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE,
true);
SqlHelpers.sql(
getTableEnv(),
"create catalog iceberg_catalog with ('type'='iceberg',
'catalog-type'='hadoop', 'warehouse'='%s')",
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
new file mode 100644
index 0000000000..2908cb9272
--- /dev/null
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestIcebergSourceInferParallelism {
+ private static final int NUM_TMS = 2;
+ private static final int SLOTS_PER_TM = 2;
+ private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+ private static final int MAX_INFERRED_PARALLELISM = 3;
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+ .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .build());
+
+ @RegisterExtension
+ protected static final HadoopCatalogExtension CATALOG_EXTENSION =
+ new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+
+ @TempDir private Path tmpDir;
+
+ private Table table;
+ private GenericAppenderHelper dataAppender;
+
+ @BeforeEach
+ public void before() throws IOException {
+ this.table =
+ CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ this.dataAppender = new GenericAppenderHelper(table, FileFormat.PARQUET,
tmpDir);
+ }
+
+ @AfterEach
+ public void after() {
+ CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+ }
+
+ @Test
+ public void testEmptyTable() throws Exception {
+ // Inferred parallelism should be at least 1 even if table is empty
+ test(1, 0);
+ }
+
+ @Test
+ public void testTableWithFilesLessThanMaxInferredParallelism() throws
Exception {
+ // Append files to the table
+ for (int i = 0; i < 2; ++i) {
+ List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
+ dataAppender.appendToTable(batch);
+ }
+
+ // Inferred parallelism should equal to 2 splits
+ test(2, 2);
+ }
+
+ @Test
+ public void testTableWithFilesMoreThanMaxInferredParallelism() throws
Exception {
+ // Append files to the table
+ for (int i = 0; i < MAX_INFERRED_PARALLELISM + 1; ++i) {
+ List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
+ dataAppender.appendToTable(batch);
+ }
+
+ // Inferred parallelism should be capped by the MAX_INFERRED_PARALLELISM
+ test(MAX_INFERRED_PARALLELISM, MAX_INFERRED_PARALLELISM + 1);
+ }
+
+ private void test(int expectedParallelism, int expectedRecords) throws
Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ Configuration config = new Configuration();
+ config.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
true);
+ config.set(
+ FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
+ MAX_INFERRED_PARALLELISM);
+
+ DataStream<Row> dataStream =
+ IcebergSource.forRowData()
+ .tableLoader(CATALOG_EXTENSION.tableLoader())
+ .table(table)
+ .flinkConfig(config)
+ // force one file per split
+ .splitSize(1L)
+ .buildStream(env)
+ .map(new
RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema())));
+
+ DataStream.Collector<Row> collector = new DataStream.Collector<>();
+ dataStream.collectAsync(collector);
+ JobClient jobClient = env.executeAsync();
+ try (CloseableIterator<Row> iterator = collector.getOutput()) {
+ List<Row> result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+
+ assertThat(result).hasSize(expectedRecords);
+ verifySourceParallelism(
+ expectedParallelism,
miniCluster().getExecutionGraph(jobClient.getJobID()).get());
+ }
+ }
+
+ /**
+ * Borrowed this approach from Flink {@code FileSourceTextLinesITCase} to
get source parallelism
+ * from execution graph.
+ */
+ private static void verifySourceParallelism(
+ int expectedParallelism, AccessExecutionGraph executionGraph) {
+ AccessExecutionJobVertex sourceVertex =
+ executionGraph.getVerticesTopologically().iterator().next();
+ assertThat(sourceVertex.getParallelism()).isEqualTo(expectedParallelism);
+ }
+
+ /**
+ * Use reflection to get {@code InternalMiniClusterExtension} and {@code
MiniCluster} to get
+ * execution graph and source parallelism. Haven't find other way via public
APIS.
+ */
+ private static MiniCluster miniCluster() throws Exception {
+ Field privateField =
+
MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension");
+ privateField.setAccessible(true);
+ InternalMiniClusterExtension internalExtension =
+ (InternalMiniClusterExtension)
privateField.get(MINI_CLUSTER_EXTENSION);
+ return internalExtension.getMiniCluster();
+ }
+}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index 75f0a785a8..548940a842 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -53,7 +53,12 @@ public class TestIcebergSourceSql extends TestSqlBase {
public void before() throws IOException {
TableEnvironment tableEnvironment = getTableEnv();
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
-
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
true);
+ tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE,
true);
+ // Disable inferring parallelism to avoid interfering watermark tests
+ // that check split assignment is ordered by the watermark column.
+ // The tests assumes default parallelism of 1 with single reader task
+ // in order to check the order of read records.
+
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism",
"1");
SqlHelpers.sql(
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index 51f9025b41..564e8139e6 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -48,14 +48,20 @@ import org.apache.iceberg.flink.TestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
+/**
+ * There is a infinite sleep in the test. Add a timeout to the test to avoid
stuck situation in case
+ * anything goes wrong unexpectedly.
+ */
+@Timeout(value = 60)
public class TestIcebergSpeculativeExecutionSupport extends TestBase {
private static final int NUM_TASK_MANAGERS = 1;
private static final int NUM_TASK_SLOTS = 3;
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUM_TASK_MANAGERS)
@@ -144,9 +150,9 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
private static class TestingMap extends RichMapFunction<Row, Row> {
@Override
public Row map(Row row) throws Exception {
- // Put the subtasks with the first attempt to sleep to trigger
speculative
- // execution
- if (getRuntimeContext().getAttemptNumber() <= 0) {
+ // Simulate slow subtask 0 with attempt 0
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0
+ && getRuntimeContext().getAttemptNumber() <= 0) {
Thread.sleep(Integer.MAX_VALUE);
}
@@ -169,6 +175,7 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
// Use FLIP-27 source
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE,
true);
+
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
// for speculative execution
configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 351ba54e5c..5718f4b938 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
@@ -37,6 +39,9 @@ import
org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
@@ -74,6 +79,7 @@ import
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
@@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private final SerializableRecordEmitter<T> emitter;
private final String tableName;
+ // cache the discovered splits by planSplitsForBatch, which can be called
twice. And they come
+ // from two different threads: (1) source/stream construction by main thread
(2) enumerator
+ // creation. Hence need volatile here.
+ private volatile List<IcebergSourceSplit> batchSplits;
+
IcebergSource(
TableLoader tableLoader,
ScanContext scanContext,
@@ -132,16 +143,26 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return tableName + "-" + UUID.randomUUID();
}
+ /**
+ * Cache the enumerated splits for batch execution to avoid double planning
as there are two code
+ * paths obtaining splits: (1) infer parallelism (2) enumerator creation.
+ */
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
+ if (batchSplits != null) {
+ return batchSplits;
+ }
+
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try (TableLoader loader = tableLoader.clone()) {
loader.open();
- List<IcebergSourceSplit> splits =
+ this.batchSplits =
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(),
scanContext, workerPool);
LOG.info(
- "Discovered {} splits from table {} during job initialization",
splits.size(), tableName);
- return splits;
+ "Discovered {} splits from table {} during job initialization",
+ batchSplits.size(),
+ tableName);
+ return batchSplits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
@@ -207,12 +228,35 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
// Only do scan planning if nothing is restored from checkpoint state
List<IcebergSourceSplit> splits =
planSplitsForBatch(planningThreadName());
assigner.onDiscoveredSplits(splits);
+ // clear the cached splits after enumerator creation as they won't be
needed anymore
+ this.batchSplits = null;
}
return new StaticIcebergEnumerator(enumContext, assigner);
}
}
+ private boolean shouldInferParallelism() {
+ return !scanContext.isStreaming();
+ }
+
+ private int inferParallelism(ReadableConfig flinkConf,
StreamExecutionEnvironment env) {
+ int parallelism =
+ SourceUtil.inferParallelism(
+ flinkConf,
+ scanContext.limit(),
+ () -> {
+ List<IcebergSourceSplit> splits =
planSplitsForBatch(planningThreadName());
+ return splits.size();
+ });
+
+ if (env.getMaxParallelism() > 0) {
+ parallelism = Math.min(parallelism, env.getMaxParallelism());
+ }
+
+ return parallelism;
+ }
+
/**
* Create a source builder.
*
@@ -571,6 +615,41 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
emitter);
}
+ /**
+ * Build the {@link IcebergSource} and create a {@link DataStream} from
the source. Watermark
+ * strategy is set to {@link WatermarkStrategy#noWatermarks()}.
+ *
+ * @return data stream from the Iceberg source
+ */
+ public DataStream<T> buildStream(StreamExecutionEnvironment env) {
+ // buildStream should only be called with RowData or Converter paths.
+ Preconditions.checkState(
+ readerFunction == null,
+ "Cannot set reader function when building a data stream from the
source");
+ IcebergSource<T> source = build();
+ TypeInformation<T> outputTypeInfo =
+ outputTypeInfo(converter, table.schema(),
source.scanContext.project());
+ DataStreamSource<T> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
source.name(), outputTypeInfo);
+ if (source.shouldInferParallelism()) {
+ stream = stream.setParallelism(source.inferParallelism(flinkConfig,
env));
+ }
+
+ return stream;
+ }
+
+ private static <T> TypeInformation<T> outputTypeInfo(
+ RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
+ if (converter != null) {
+ return converter.getProducedType();
+ } else {
+ // output type is RowData
+ Schema readSchema = projected != null ? projected : tableSchema;
+ return (TypeInformation<T>)
+
FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
+ }
+ }
+
private ReaderFunction<T> readerFunction(ScanContext context) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 610657e8d4..65adce77d9 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -23,11 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
@@ -128,26 +125,18 @@ public class IcebergTableSource
.build();
}
- private DataStreamSource<RowData>
createFLIP27Stream(StreamExecutionEnvironment env) {
+ private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment
env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
- IcebergSource<RowData> source =
- IcebergSource.forRowData()
- .tableLoader(loader)
- .assignerFactory(assignerType.factory())
- .properties(properties)
- .project(getProjectedSchema())
- .limit(limit)
- .filters(filters)
- .flinkConfig(readableConfig)
- .build();
- DataStreamSource stream =
- env.fromSource(
- source,
- WatermarkStrategy.noWatermarks(),
- source.name(),
- TypeInformation.of(RowData.class));
- return stream;
+ return IcebergSource.forRowData()
+ .tableLoader(loader)
+ .assignerFactory(assignerType.factory())
+ .properties(properties)
+ .project(getProjectedSchema())
+ .limit(limit)
+ .filters(filters)
+ .flinkConfig(readableConfig)
+ .buildStream(env);
}
private TableSchema getProjectedSchema() {
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index b7447d15c0..db8647f054 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -24,8 +24,6 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -130,11 +128,8 @@ public class TestIcebergSourceBounded extends
TestFlinkScan {
sourceBuilder.properties(options);
DataStream<Row> stream =
- env.fromSource(
- sourceBuilder.build(),
- WatermarkStrategy.noWatermarks(),
- "testBasicRead",
- TypeInformation.of(RowData.class))
+ sourceBuilder
+ .buildStream(env)
.map(
new RowDataToRowMapper(
FlinkSchemaUtil.convert(
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
index 0f41c5af4c..d3713e2960 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
@@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends
TestIcebergSourceBounded {
@BeforeEach
public void before() throws IOException {
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
-
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
true);
+ tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE,
true);
SqlHelpers.sql(
getTableEnv(),
"create catalog iceberg_catalog with ('type'='iceberg',
'catalog-type'='hadoop', 'warehouse'='%s')",
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
new file mode 100644
index 0000000000..2908cb9272
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestIcebergSourceInferParallelism {
+ private static final int NUM_TMS = 2;
+ private static final int SLOTS_PER_TM = 2;
+ private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+ private static final int MAX_INFERRED_PARALLELISM = 3;
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+ .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .build());
+
+ @RegisterExtension
+ protected static final HadoopCatalogExtension CATALOG_EXTENSION =
+ new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+
+ @TempDir private Path tmpDir;
+
+ private Table table;
+ private GenericAppenderHelper dataAppender;
+
+ @BeforeEach
+ public void before() throws IOException {
+ this.table =
+ CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ this.dataAppender = new GenericAppenderHelper(table, FileFormat.PARQUET,
tmpDir);
+ }
+
+ @AfterEach
+ public void after() {
+ CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+ }
+
+ @Test
+ public void testEmptyTable() throws Exception {
+ // Inferred parallelism should be at least 1 even if table is empty
+ test(1, 0);
+ }
+
+ @Test
+ public void testTableWithFilesLessThanMaxInferredParallelism() throws
Exception {
+ // Append files to the table
+ for (int i = 0; i < 2; ++i) {
+ List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
+ dataAppender.appendToTable(batch);
+ }
+
+ // Inferred parallelism should equal to 2 splits
+ test(2, 2);
+ }
+
+ @Test
+ public void testTableWithFilesMoreThanMaxInferredParallelism() throws
Exception {
+ // Append files to the table
+ for (int i = 0; i < MAX_INFERRED_PARALLELISM + 1; ++i) {
+ List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
+ dataAppender.appendToTable(batch);
+ }
+
+ // Inferred parallelism should be capped by the MAX_INFERRED_PARALLELISM
+ test(MAX_INFERRED_PARALLELISM, MAX_INFERRED_PARALLELISM + 1);
+ }
+
+ private void test(int expectedParallelism, int expectedRecords) throws
Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ Configuration config = new Configuration();
+ config.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
true);
+ config.set(
+ FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
+ MAX_INFERRED_PARALLELISM);
+
+ DataStream<Row> dataStream =
+ IcebergSource.forRowData()
+ .tableLoader(CATALOG_EXTENSION.tableLoader())
+ .table(table)
+ .flinkConfig(config)
+ // force one file per split
+ .splitSize(1L)
+ .buildStream(env)
+ .map(new
RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema())));
+
+ DataStream.Collector<Row> collector = new DataStream.Collector<>();
+ dataStream.collectAsync(collector);
+ JobClient jobClient = env.executeAsync();
+ try (CloseableIterator<Row> iterator = collector.getOutput()) {
+ List<Row> result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+
+ assertThat(result).hasSize(expectedRecords);
+ verifySourceParallelism(
+ expectedParallelism,
miniCluster().getExecutionGraph(jobClient.getJobID()).get());
+ }
+ }
+
+ /**
+ * Borrowed this approach from Flink {@code FileSourceTextLinesITCase} to
get source parallelism
+ * from execution graph.
+ */
+ private static void verifySourceParallelism(
+ int expectedParallelism, AccessExecutionGraph executionGraph) {
+ AccessExecutionJobVertex sourceVertex =
+ executionGraph.getVerticesTopologically().iterator().next();
+ assertThat(sourceVertex.getParallelism()).isEqualTo(expectedParallelism);
+ }
+
+ /**
+ * Use reflection to get {@code InternalMiniClusterExtension} and {@code
MiniCluster} to get
+ * execution graph and source parallelism. Haven't find other way via public
APIS.
+ */
+ private static MiniCluster miniCluster() throws Exception {
+ Field privateField =
+
MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension");
+ privateField.setAccessible(true);
+ InternalMiniClusterExtension internalExtension =
+ (InternalMiniClusterExtension)
privateField.get(MINI_CLUSTER_EXTENSION);
+ return internalExtension.getMiniCluster();
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index 75f0a785a8..548940a842 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -53,7 +53,12 @@ public class TestIcebergSourceSql extends TestSqlBase {
public void before() throws IOException {
TableEnvironment tableEnvironment = getTableEnv();
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
-
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
true);
+ tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE,
true);
+ // Disable inferring parallelism to avoid interfering watermark tests
+ // that check split assignment is ordered by the watermark column.
+ // The tests assumes default parallelism of 1 with single reader task
+ // in order to check the order of read records.
+
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism",
"1");
SqlHelpers.sql(
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index 41b023b936..05a08c24d8 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.BatchExecutionOptions;
@@ -48,8 +49,14 @@ import org.apache.iceberg.flink.TestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
+/**
+ * There is a infinite sleep in the test. Add a timeout to the test to avoid
stuck situation in case
+ * anything goes wrong unexpectedly.
+ */
+@Timeout(value = 60)
public class TestIcebergSpeculativeExecutionSupport extends TestBase {
private static final int NUM_TASK_MANAGERS = 1;
private static final int NUM_TASK_SLOTS = 3;
@@ -144,9 +151,9 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
private static class TestingMap extends RichMapFunction<Row, Row> {
@Override
public Row map(Row row) throws Exception {
- // Put the subtasks with the first attempt to sleep to trigger
speculative
- // execution
- if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
+ // Simulate slow subtask 0 with attempt 0
+ TaskInfo taskInfo = getRuntimeContext().getTaskInfo();
+ if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber()
<= 0) {
Thread.sleep(Integer.MAX_VALUE);
}
@@ -169,6 +176,7 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
// Use FLIP-27 source
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE,
true);
+
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
// for speculative execution
configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);