This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6b3ce095b0a Reduce the maximum size of input splits in Flink to better
distribute work (#28045)
6b3ce095b0a is described below
commit 6b3ce095b0a12a8385eacdb1748fb523247850c1
Author: Julien Tournay <[email protected]>
AuthorDate: Fri Sep 1 22:48:25 2023 +0200
Reduce the maximum size of input splits in Flink to better distribute work
(#28045)
---
.../beam/runners/flink/FlinkPipelineOptions.java | 7 +++++++
.../translation/wrappers/SourceInputFormat.java | 22 +++++++++++++++++++++-
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 2f32ab2c2ea..1e01514fe8b 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -313,6 +313,13 @@ public interface FlinkPipelineOptions
void setFlinkConfDir(String confDir);
+ @Description(
+ "Set the maximum size of input split when data is read from a
filesystem. 0 implies no max size.")
+ @Default.Long(0)
+ Long getFileInputSplitMaxSizeMB();
+
+ void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB);
+
static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 7e1835eac72..a1b8bced7a1 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -20,9 +20,11 @@ package org.apache.beam.runners.flink.translation.wrappers;
import java.io.IOException;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -108,12 +110,30 @@ public class SourceInputFormat<T> extends
RichInputFormat<WindowedValue<T>, Sour
return null;
}
+ private long getDesiredSizeBytes(int numSplits) throws Exception {
+ long totalSize = initialSource.getEstimatedSizeBytes(options);
+ long defaultSplitSize = totalSize / numSplits;
+ long maxSplitSize = 0;
+ if (options != null) {
+ maxSplitSize =
options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB();
+ }
+ if (initialSource instanceof FileBasedSource && maxSplitSize > 0) {
+ // Most of the time parallelism is < number of files in source.
+ // Each file becomes a unique split which commonly create skew.
+ // This limits the size of splits to reduce skew.
+ return Math.min(defaultSplitSize, maxSplitSize * 1024 * 1024);
+ } else {
+ return defaultSplitSize;
+ }
+ }
+
@Override
@SuppressWarnings("unchecked")
public SourceInputSplit<T>[] createInputSplits(int numSplits) throws
IOException {
try {
- long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) /
numSplits;
+ long desiredSizeBytes = getDesiredSizeBytes(numSplits);
List<? extends Source<T>> shards = initialSource.split(desiredSizeBytes,
options);
+
int numShards = shards.size();
SourceInputSplit<T>[] sourceInputSplits = new
SourceInputSplit[numShards];
for (int i = 0; i < numShards; i++) {