[
https://issues.apache.org/jira/browse/FLINK-21201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280903#comment-17280903
]
Zhilong Hong commented on FLINK-21201:
--------------------------------------
Sorry for making it confusing. I mean moving the construction of result
partitions to the Task's thread.
I try to implement the POC, however, I meet a new problem: it would
significantly impact the performance of Task thread. I think the throttle here
is the throughput of the disks in the cluster.
Since the throughput is a fixed value, when we put it into main thread, the IO
operations blocks main thread; when we put it into the task threads, it will
block all the task threads, and make them waiting for the completion of IO
operations.
Let's compare the IO of disks to a one-way road. In the past there is only one
fleet, which is called main thread. Now it has multiple fleets, which are
called task threads. Since the throughputs is not improved, all the tasks will
need to wait in line, and it will cost much more time to complete the tasks (10
threads, 10 times). So I think moving the construction of result partitions to
the task threads may not be a good idea.
Furthermore, I think moving the construction of result partitions to IOExecutor
would introduce concurrency issues and make the thread model complicated.
> Creating BoundedBlockingSubpartition blocks TaskManager’s main thread
> ---------------------------------------------------------------------
>
> Key: FLINK-21201
> URL: https://issues.apache.org/jira/browse/FLINK-21201
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Network
> Affects Versions: 1.12.1
> Reporter: Zhilong Hong
> Priority: Major
> Attachments: jobmanager.log.tar.gz, taskmanager.log.tar.gz
>
>
> When we are trying to run batch jobs with 8k parallelism, it takes a long
> time to deploy the vertices. After the investigation, we find that creating
> BoundedBlockingSubpartition blocks TaskManager’s main thread during the
> procedure of {{submitTask}}.
> When JobMaster invokes {{submitTask}} and sends an RPC call to the
> TaskManager, the TaskManager will receive the RPC call and execute the
> {{submitTask}} method in its main thread. In the {{submitTask}} method, the
> TaskExecutor will create a Task instance and try to start it. During the
> creation, the TaskExecutor will create the ResultPartition and its
> ResultSubpartitions.
> For the batch job, the type of ResultSubpartitions is the
> BoundedBlockingSubpartition with the FileChannelBoundedData. The
> BoundedBlockingSubpartition will create a file on the local disk, which is an
> IO operation and could take a long time.
> In our test, it would take at most 30+ seconds to create 8k
> BoundedBlockingSubpartitions. This procedure blocks the main thread of the
> TaskManager, and would lead to heartbeat timeout and slow task deploying. In
> my opinion, the IO operation should be executed with IOExecutor rather than
> the main thread.
> I add several log items to show what TaskExecutor is doing during
> {{submitTask}}.
> {code:java}
> 2021-01-29 14:44:37,557 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Start to
> submit task #898 (c9aefd1d30c2b133ba04ad495cd894fd)
> 2021-01-29 14:44:37,557 INFO
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate
> slot 38cafd5b456cc8ff873bbe18e4bf708a.
> 2021-01-29 14:44:37,932 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Start to
> init Task #898 (c9aefd1d30c2b133ba04ad495cd894fd) instance.
> 2021-01-29 14:44:37,932 INFO
> org.apache.flink.runtime.io.network.NettyShuffleEnvironment [] - Start to
> create 1 result partition(s).
> 2021-01-29 14:44:37,932 INFO
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] -
> Initializing BoundedBlockingResultPartitions
> 2021-01-29 14:44:37,932 INFO
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] -
> Start to create 8000 FILE BoundedBlockingSubpartitions.
> 2021-01-29 14:44:37,932 INFO
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition []
> - FileChannel #0 created.
> ... ...
> 2021-01-29 14:45:06,052 INFO
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition []
> - FileChannel #7999 created.
> 2021-01-29 14:45:06,052 INFO
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] -
> Finish creating 8000 FILE BoundedBlockingSubpartitions.
> 2021-01-29 14:45:06,052 INFO
> org.apache.flink.runtime.io.network.NettyShuffleEnvironment [] - Finish
> creating 1 result partition(s).
> 2021-01-29 14:45:06,052 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Finish
> initializing task #898 (c9aefd1d30c2b133ba04ad495cd894fd) instance.
> 2021-01-29 14:45:06,052 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> task Source: Source 0 (898/8000)#0 (c9aefd1d30c2b133ba04ad495cd894fd), deploy
> into slot with allocation id 38cafd5b456cc8ff873bbe18e4bf708a.
> 2021-01-29 14:45:06,053 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Source: Source 0 (898/8000)#0
> (c9aefd1d30c2b133ba04ad495cd894fd) switched from CREATED to DEPLOYING.
> {code}
> We can see that it takes nearly 29 seconds to create 8k
> BoundedBlockingSubpartitions, and this would blocks the main thread in the
> TaskExecutor.
> The log of JobManager and TaskManager is attached below. The most typical
> task is Source 0: #898.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)