Min Shen created SPARK-29206:
--------------------------------
Summary: Number of shuffle Netty server threads should be a
multiple of number of chunk fetch handler threads
Key: SPARK-29206
URL: https://issues.apache.org/jira/browse/SPARK-29206
Project: Spark
Issue Type: Improvement
Components: Shuffle
Affects Versions: 3.0.0
Reporter: Min Shen
In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool
to handle the slow-to-process chunk fetch requests in order to improve the
responsiveness of shuffle service for RPC requests.
Initially, we thought by making the number of Netty server threads larger than
the number of chunk fetch handler threads, it would reserve some threads for
RPC requests thus resolving the various RPC request timeout issues we
experienced previously. The solution worked in our cluster initially. However,
as the number of Spark applications in our cluster continues to increase, we
saw the RPC request (SASL authentication specifically) timeout issue again:
{noformat}
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout
waiting for task.
at
org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
at
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
at
org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at
org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
at
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
at
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
{noformat}
After further investigation, we realized that as the number of concurrent
clients connecting to a shuffle service increases, it becomes _VERY_ important
to configure the number of Netty server threads and number of chunk fetch
handler threads correctly. Specifically, the number of Netty server threads
needs to be a multiple of the number of chunk fetch handler threads. The reason
is explained in details below:
When a channel is established on the Netty server, it is registered with both
the Netty server default EventLoopGroup and the chunk fetch handler
EventLoopGroup. Once registered, this channel sticks with a given thread in
both EventLoopGroups, i.e. all requests from this channel is going to be
handled by the same thread. Right now, Spark shuffle Netty server uses the
default Netty strategy to select a thread from a EventLoopGroup to be
associated with a new channel, which is simply round-robin (Netty's
DefaultEventExecutorChooserFactory).
In SPARK-24355, with the introduced chunk fetch handler thread pool, all chunk
fetch requests from a given channel will be first added to the task queue of
the chunk fetch handler thread associated with that channel. When the requests
get processed, the chunk fetch request handler thread will submit a task to the
task queue of the Netty server thread that's also associated with this channel.
If the number of Netty server threads is not a multiple of the number of chunk
fetch handler threads, it would become a problem when the server has a large
number of concurrent connections.
Assume we configure the number of Netty server threads as 40 and the percentage
of chunk fetch handler threads as 87, which leads to 35 chunk fetch handler
threads. Then according to the round-robin policy, channel 0, 40, 80, 120, 160,
200, 240, and 280 will all be associated with the 1st Netty server thread in
the default EventLoopGroup. However, since the chunk fetch handler thread pool
only has 35 threads, out of these 8 channels, only channel 0 and 280 will be
associated with the same chunk fetch handler thread. Thus, channel 0, 40, 80,
120, 160, 200, 240 will all be associated with different chunk fetch handler
threads but associated with the same Netty server thread. This means, the 7
different chunk fetch handler threads associated with these channels could
potentially submit tasks to the task queue of the same Netty server thread at
the same time. This would lead to 7 slow-to-process requests sitting in the
task queue. If an RPC request is put in the task queue after these 7 requests,
it is very likely to timeout.
In our cluster, the number of concurrent active connections to a shuffle
service could go as high as 6K+ during peak. If the numbers of these thread
pools are not configured correctly, our Spark applications are guaranteed to
see SASL timeout issues when a shuffle service is dealing with a lot of
incoming chunk fetch requests from many distinct clients, which lead to stage
failures and lengthy retries.
To resolve this issue, the number of Netty server threads needs to be a
multiple of the number of chunk fetch handler threads. This way, the
round-robin policy will guarantee that channels associated with different chunk
fetch handler threads will also be associated with different Netty server
threads, thus eliminating this potential burst of placing multiple
slow-to-process requests in one Netty server thread task queue.
Since the current patch that's merged in Spark uses
`spark.shuffle.server.chunkFetchHandlerThreadsPercent` to configure the number
of chunk fetch handler threads and it rounds up the number, it is very tricky
to get the number of these thread pools configured right. In addition, for
people who are not aware of this issue, they will very likely to fall into this
trap and start seeing the RPC request timeout issue during shuffle fetch when
the Spark workloads in their environment get to a certain scale. For these
reasons, we propose to change the configurations of the number of threads for
both thread pools, such that if people choose to use the dedicated chunk fetch
handler, the number of Netty server threads would always be a multiple of the
number of chunk fetch handler threads.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]