[ https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484650#comment-16484650 ]
Min Shen commented on SPARK-24355: ---------------------------------- [~felixcheung] [~jinxing6...@126.com] [~cloud_fan] Could you please take a look at this PR? https://github.com/apache/spark/pull/21402 > Improve Spark shuffle server responsiveness to non-ChunkFetch requests > ---------------------------------------------------------------------- > > Key: SPARK-24355 > URL: https://issues.apache.org/jira/browse/SPARK-24355 > Project: Spark > Issue Type: Improvement > Components: Shuffle > Affects Versions: 2.3.0 > Environment: Hadoop-2.7.4 > Spark-2.3.0 > Reporter: Min Shen > Priority: Major > > We run Spark on YARN, and deploy Spark external shuffle service as part of > YARN NM aux service. > One issue we saw with Spark external shuffle service is the various timeout > experienced by the clients on either registering executor with local shuffle > server or establish connection to remote shuffle server. > Example of a timeout for establishing connection with remote shuffle server: > {code:java} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark_project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) > at > org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57) > ....{code} > Example of a timeout for registering executor with local shuffle server: > {code:java} > ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {code} > While patches such as SPARK-20640 and config parameters such as > spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when > spark.authenticate is set to true) could help to alleviate this type of > problems, it does not solve the fundamental issue. > We have observed that, when the shuffle workload gets very busy in peak > hours, the client requests could timeout even after configuring these > parameters to very high values. Further investigating this issue revealed the > following issue: > Right now, the default server side netty handler threads is 2 * # cores, and > can be further configured with parameter spark.shuffle.io.serverThreads. > In order to process a client request, it would require one available server > netty handler thread. > However, when the server netty handler threads start to process > ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk > contentions from the random read operations initiated by all the > ChunkFetchRequests received from clients. > As a result, when the shuffle server is serving many concurrent > ChunkFetchRequests, the server side netty handler threads could all be > blocked on reading shuffle files, thus leaving no handler thread available to > process other types of requests which should all be very quick to process. > This issue could potentially be fixed by limiting the number of netty handler > threads that could get blocked when processing ChunkFetchRequest. We have a > patch to do this by using a separate EventLoopGroup with a dedicated > ChannelHandler to process ChunkFetchRequest. This enables shuffle server to > reserve netty handler threads for non-ChunkFetchRequest, thus enabling > consistent processing time for these requests which are fast to process. > After deploying the patch in our infrastructure, we no longer see timeout > issues with either executor registration with local shuffle server or shuffle > client establishing connection with remote shuffle server. > Will post the patch soon, and want to gather feedbacks from the community. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org