[
https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935552#comment-16935552
]
Min Shen commented on SPARK-29206:
----------------------------------
[~redsanket], [~tgraves],
Since you worked on committing the original patch, would appreciate your
comments here.
> Number of shuffle Netty server threads should be a multiple of number of
> chunk fetch handler threads
> ----------------------------------------------------------------------------------------------------
>
> Key: SPARK-29206
> URL: https://issues.apache.org/jira/browse/SPARK-29206
> Project: Spark
> Issue Type: Improvement
> Components: Shuffle
> Affects Versions: 3.0.0
> Reporter: Min Shen
> Priority: Major
>
> In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool
> to handle the slow-to-process chunk fetch requests in order to improve the
> responsiveness of shuffle service for RPC requests.
> Initially, we thought by making the number of Netty server threads larger
> than the number of chunk fetch handler threads, it would reserve some threads
> for RPC requests thus resolving the various RPC request timeout issues we
> experienced previously. The solution worked in our cluster initially.
> However, as the number of Spark applications in our cluster continues to
> increase, we saw the RPC request (SASL authentication specifically) timeout
> issue again:
> {noformat}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout
> waiting for task.
> at
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
> at
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
> at
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
> at
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
> at
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
> at
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
> {noformat}
> After further investigation, we realized that as the number of concurrent
> clients connecting to a shuffle service increases, it becomes _VERY_
> important to configure the number of Netty server threads and number of chunk
> fetch handler threads correctly. Specifically, the number of Netty server
> threads needs to be a multiple of the number of chunk fetch handler threads.
> The reason is explained in details below:
> When a channel is established on the Netty server, it is registered with both
> the Netty server default EventLoopGroup and the chunk fetch handler
> EventLoopGroup. Once registered, this channel sticks with a given thread in
> both EventLoopGroups, i.e. all requests from this channel is going to be
> handled by the same thread. Right now, Spark shuffle Netty server uses the
> default Netty strategy to select a thread from a EventLoopGroup to be
> associated with a new channel, which is simply round-robin (Netty's
> DefaultEventExecutorChooserFactory).
> In SPARK-24355, with the introduced chunk fetch handler thread pool, all
> chunk fetch requests from a given channel will be first added to the task
> queue of the chunk fetch handler thread associated with that channel. When
> the requests get processed, the chunk fetch request handler thread will
> submit a task to the task queue of the Netty server thread that's also
> associated with this channel. If the number of Netty server threads is not a
> multiple of the number of chunk fetch handler threads, it would become a
> problem when the server has a large number of concurrent connections.
> Assume we configure the number of Netty server threads as 40 and the
> percentage of chunk fetch handler threads as 87, which leads to 35 chunk
> fetch handler threads. Then according to the round-robin policy, channel 0,
> 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty
> server thread in the default EventLoopGroup. However, since the chunk fetch
> handler thread pool only has 35 threads, out of these 8 channels, only
> channel 0 and 280 will be associated with the same chunk fetch handler
> thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated
> with different chunk fetch handler threads but associated with the same Netty
> server thread. This means, the 7 different chunk fetch handler threads
> associated with these channels could potentially submit tasks to the task
> queue of the same Netty server thread at the same time. This would lead to 7
> slow-to-process requests sitting in the task queue. If an RPC request is put
> in the task queue after these 7 requests, it is very likely to timeout.
> In our cluster, the number of concurrent active connections to a shuffle
> service could go as high as 6K+ during peak. If the numbers of these thread
> pools are not configured correctly, our Spark applications are guaranteed to
> see SASL timeout issues when a shuffle service is dealing with a lot of
> incoming chunk fetch requests from many distinct clients, which lead to stage
> failures and lengthy retries.
> To resolve this issue, the number of Netty server threads needs to be a
> multiple of the number of chunk fetch handler threads. This way, the
> round-robin policy will guarantee that channels associated with different
> chunk fetch handler threads will also be associated with different Netty
> server threads, thus eliminating this potential burst of placing multiple
> slow-to-process requests in one Netty server thread task queue.
> Since the current patch that's merged in Spark uses
> `spark.shuffle.server.chunkFetchHandlerThreadsPercent` to configure the
> number of chunk fetch handler threads and it rounds up the number, it is very
> tricky to get the number of these thread pools configured right. In addition,
> for people who are not aware of this issue, they will very likely to fall
> into this trap and start seeing the RPC request timeout issue during shuffle
> fetch when the Spark workloads in their environment get to a certain scale.
> For these reasons, we propose to change the configurations of the number of
> threads for both thread pools, such that if people choose to use the
> dedicated chunk fetch handler, the number of Netty server threads would
> always be a multiple of the number of chunk fetch handler threads.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]