[
https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thomas Graves reassigned SPARK-24355:
-------------------------------------
Assignee: Sanket Chintapalli
> Improve Spark shuffle server responsiveness to non-ChunkFetch requests
> ----------------------------------------------------------------------
>
> Key: SPARK-24355
> URL: https://issues.apache.org/jira/browse/SPARK-24355
> Project: Spark
> Issue Type: Improvement
> Components: Shuffle
> Affects Versions: 2.3.0
> Environment: Hadoop-2.7.4
> Spark-2.3.0
> Reporter: Min Shen
> Assignee: Sanket Chintapalli
> Priority: Major
> Fix For: 2.5.0
>
>
> We run Spark on YARN, and deploy Spark external shuffle service as part of
> YARN NM aux service.
> One issue we saw with Spark external shuffle service is the various timeout
> experienced by the clients on either registering executor with local shuffle
> server or establish connection to remote shuffle server.
> Example of a timeout for establishing connection with remote shuffle server:
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout
> waiting for task.
> at
> org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
> at
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
> at
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
> at
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at
> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
> ....{code}
> Example of a timeout for registering executor with local shuffle server:
> {code:java}
> ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout
> waiting for task.
> at
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
> at
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
> at
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
> at
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
> at
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
> at
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
> {code}
> While patches such as SPARK-20640 and config parameters such as
> spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when
> spark.authenticate is set to true) could help to alleviate this type of
> problems, it does not solve the fundamental issue.
> We have observed that, when the shuffle workload gets very busy in peak
> hours, the client requests could timeout even after configuring these
> parameters to very high values. Further investigating this issue revealed the
> following issue:
> Right now, the default server side netty handler threads is 2 * # cores, and
> can be further configured with parameter spark.shuffle.io.serverThreads.
> In order to process a client request, it would require one available server
> netty handler thread.
> However, when the server netty handler threads start to process
> ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk
> contentions from the random read operations initiated by all the
> ChunkFetchRequests received from clients.
> As a result, when the shuffle server is serving many concurrent
> ChunkFetchRequests, the server side netty handler threads could all be
> blocked on reading shuffle files, thus leaving no handler thread available to
> process other types of requests which should all be very quick to process.
> This issue could potentially be fixed by limiting the number of netty handler
> threads that could get blocked when processing ChunkFetchRequest. We have a
> patch to do this by using a separate EventLoopGroup with a dedicated
> ChannelHandler to process ChunkFetchRequest. This enables shuffle server to
> reserve netty handler threads for non-ChunkFetchRequest, thus enabling
> consistent processing time for these requests which are fast to process.
> After deploying the patch in our infrastructure, we no longer see timeout
> issues with either executor registration with local shuffle server or shuffle
> client establishing connection with remote shuffle server.
> Will post the patch soon, and want to gather feedbacks from the community.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]