ddebowczyk92 opened a new issue, #28589:
URL: https://github.com/apache/beam/issues/28589

   ### What would you like to happen?
   
   When executing complex job graphs with high parallelism on Flink, we 
sometimes encounter the following error that causes jobs to fail:
   
   ```java
   java.lang.RuntimeException: Could not retrieve the next input split.
        at 
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:393)
        at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting 
the next input split failed.
        at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:70)
        at 
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:391)
        ... 6 more
   Caused by: java.util.concurrent.TimeoutException
        at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
        at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
        ... 7 more
   ```
   This issue is particularly likely to occur when the job reads simultaneously 
from multiple partitions consisting of a large number of files. It appears that 
Flink TaskManagers overwhelm the JobManager with input split requests. During 
our investigation into the hotspots affecting the JobManager's response time, 
as shown in the attached VisualVM screenshot, we noticed that a significant 
amount of CPU time is consumed by 
`org.apache.beam.sdk.io.hadoop.SerializableConfiguration.writeExternal()`. This 
method serializes the underlying Hadoop `Configuration` object each time 
`ExecutionGraphHandler.requestNextInputSplit()` is called.
   
![visualvm](https://github.com/apache/beam/assets/8567235/a83e4ac6-09f3-4c47-92ca-6df636eaad31)
   
   Since `SerializableConfiguration` and the underlying `Configuration` object 
remain the same for each `InputSplit`, we have proposed a solution. 
`SerializableConfiguration.writeExternal()` serializes `Configuration` object 
data to a byte array, and with every consecutive call of `writeExternal()`, the 
data is read from the array. This approach has been confirmed to alleviate the 
pressure on the JobManager, resulting in fewer occurrences of the 
`org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
Requesting the next input split failed error.`
   
   Tagging @kristoffSC as he is the initial author of this idea
   
   
   
[CPU_snapshot.zip](https://github.com/apache/beam/files/12686278/CPU_snapshot-1693398299669.zip)
   
   
   
   ### Issue Priority
   
   Priority: 2 (default / most feature requests should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to