This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 11631cb5956 [FLINK-34145][connector/filesystem] support dynamic source
parallelism inference in batch jobs
11631cb5956 is described below
commit 11631cb59568df60d40933fb13c8433062ed9290
Author: sunxia <[email protected]>
AuthorDate: Wed Jan 24 14:26:03 2024 +0800
[FLINK-34145][connector/filesystem] support dynamic source parallelism
inference in batch jobs
This closes #24186.
---
.../connector/file/src/AbstractFileSource.java | 6 ++-
.../flink/connector/file/src/FileSource.java | 25 +++++++++++-
.../file/src/FileSourceTextLinesITCase.java | 46 +++++++++++++++++++++-
3 files changed, 73 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
index 6dbed75747b..f4fb463e10e 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
@@ -69,7 +69,7 @@ public abstract class AbstractFileSource<T, SplitT extends
FileSourceSplit>
private static final long serialVersionUID = 1L;
- private final Path[] inputPaths;
+ final Path[] inputPaths;
private final FileEnumerator.Provider enumeratorFactory;
@@ -100,6 +100,10 @@ public abstract class AbstractFileSource<T, SplitT extends
FileSourceSplit>
// Getters
// ------------------------------------------------------------------------
+ FileEnumerator.Provider getEnumeratorFactory() {
+ return enumeratorFactory;
+ }
+
public FileSplitAssigner.Provider getAssignerFactory() {
return assignerFactory;
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
index da76f790627..7d3f545fc02 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.file.src;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.DynamicParallelismInference;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner;
import
org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator;
@@ -32,10 +33,13 @@ import
org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.time.Duration;
+import java.util.Collection;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -93,7 +97,8 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* @param <T> The type of the events/records produced by this source.
*/
@PublicEvolving
-public final class FileSource<T> extends AbstractFileSource<T,
FileSourceSplit> {
+public final class FileSource<T> extends AbstractFileSource<T, FileSourceSplit>
+ implements DynamicParallelismInference {
private static final long serialVersionUID = 1L;
@@ -141,6 +146,24 @@ public final class FileSource<T> extends
AbstractFileSource<T, FileSourceSplit>
return FileSourceSplitSerializer.INSTANCE;
}
+ @Override
+ public int inferParallelism(Context dynamicParallelismContext) {
+ FileEnumerator fileEnumerator = getEnumeratorFactory().create();
+
+ Collection<FileSourceSplit> splits;
+ try {
+ splits =
+ fileEnumerator.enumerateSplits(
+ inputPaths,
+
dynamicParallelismContext.getParallelismInferenceUpperBound());
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not enumerate file splits",
e);
+ }
+
+ return Math.min(
+ splits.size(),
dynamicParallelismContext.getParallelismInferenceUpperBound());
+ }
+
// ------------------------------------------------------------------------
// Entry-point Factory Methods
// ------------------------------------------------------------------------
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 01cbd8aa9c2..08d53f21426 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -19,10 +19,15 @@
package org.apache.flink.connector.file.src;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -66,6 +71,8 @@ class FileSourceTextLinesITCase {
private static final int PARALLELISM = 4;
+ private static final int SOURCE_PARALLELISM_UPPER_BOUND = 8;
+
@TempDir private static java.nio.file.Path tmpDir;
@RegisterExtension
@@ -108,9 +115,25 @@ class FileSourceTextLinesITCase {
miniCluster -> testBoundedTextFileSource(tmpTestDir,
FailoverType.JM, miniCluster));
}
+ @Test
+ void testBoundedTextFileSourceWithDynamicParallelismInference(
+ @TempDir java.nio.file.Path tmpTestDir, @InjectMiniCluster
MiniCluster miniCluster)
+ throws Exception {
+ testBoundedTextFileSource(tmpTestDir, FailoverType.NONE, miniCluster,
true);
+ }
+
private void testBoundedTextFileSource(
java.nio.file.Path tmpTestDir, FailoverType failoverType,
MiniCluster miniCluster)
throws Exception {
+ testBoundedTextFileSource(tmpTestDir, failoverType, miniCluster,
false);
+ }
+
+ private void testBoundedTextFileSource(
+ java.nio.file.Path tmpTestDir,
+ FailoverType failoverType,
+ MiniCluster miniCluster,
+ boolean batchMode)
+ throws Exception {
final File testDir = tmpTestDir.toFile();
// our main test data
@@ -126,11 +149,16 @@ class FileSourceTextLinesITCase {
.build();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.setParallelism(PARALLELISM);
+
+ if (batchMode) {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ }
final DataStream<String> stream =
- env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source")
+ .setMaxParallelism(PARALLELISM * 2);
final DataStream<String> streamFailingInTheMiddleOfReading =
RecordCounterToFail.wrapWithFailureAfter(stream, LINES.length
/ 2);
@@ -149,6 +177,9 @@ class FileSourceTextLinesITCase {
}
verifyResult(result);
+ if (batchMode) {
+
verifySourceParallelism(miniCluster.getExecutionGraph(jobId).get());
+ }
}
/**
@@ -253,11 +284,16 @@ class FileSourceTextLinesITCase {
}
private static MiniClusterResourceConfiguration
createMiniClusterConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
+ SOURCE_PARALLELISM_UPPER_BOUND);
return new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
+ .setConfiguration(configuration)
.build();
}
@@ -320,6 +356,12 @@ class FileSourceTextLinesITCase {
assertThat(actual).isEqualTo(expected);
}
+ private static void verifySourceParallelism(AccessExecutionGraph
executionGraph) {
+ AccessExecutionJobVertex sourceVertex =
+ executionGraph.getVerticesTopologically().iterator().next();
+ assertThat(sourceVertex.getParallelism()).isEqualTo(FILE_PATHS.length);
+ }
+
// ------------------------------------------------------------------------
// test data
// ------------------------------------------------------------------------