ddebowczyk92 opened a new issue, #28589:
URL: https://github.com/apache/beam/issues/28589
### What would you like to happen?
When executing complex job graphs with high parallelism on Flink, we
sometimes encounter the following error that causes jobs to fail:
```java
java.lang.RuntimeException: Could not retrieve the next input split.
at
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:393)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by:
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting
the next input split failed.
at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:70)
at
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:391)
... 6 more
Caused by: java.util.concurrent.TimeoutException
at
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 7 more
```
This issue is particularly likely to occur when the job reads simultaneously
from multiple partitions consisting of a large number of files. It appears that
Flink TaskManagers overwhelm the JobManager with input split requests. During
our investigation into the hotspots affecting the JobManager's response time,
as shown in the attached VisualVM screenshot, we noticed that a significant
amount of CPU time is consumed by
`org.apache.beam.sdk.io.hadoop.SerializableConfiguration.writeExternal()`. This
method serializes the underlying Hadoop `Configuration` object each time
`ExecutionGraphHandler.requestNextInputSplit()` is called.

Since `SerializableConfiguration` and the underlying `Configuration` object
remain the same for each `InputSplit`, we have proposed a solution.
`SerializableConfiguration.writeExternal()` serializes `Configuration` object
data to a byte array, and with every consecutive call of `writeExternal()`, the
data is read from the array. This approach has been confirmed to alleviate the
pressure on the JobManager, resulting in fewer occurrences of the
`org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
Requesting the next input split failed error.`
Tagging @kristoffSC as he is the initial author of this idea
[CPU_snapshot.zip](https://github.com/apache/beam/files/12686278/CPU_snapshot-1693398299669.zip)
### Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [X] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]