This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a9a185c  [SPARK-30623][CORE] Spark external shuffle allow disable of 
separate event loop group
a9a185c is described below

commit a9a185c918678285838599b6d05087212d727c06
Author: Yuanjian Li <xyliyuanj...@gmail.com>
AuthorDate: Thu Mar 26 12:37:48 2020 +0800

    [SPARK-30623][CORE] Spark external shuffle allow disable of separate event 
loop group
    
    ### What changes were proposed in this pull request?
    Fix the regression caused by #22173.
    The original PR changes the logic of handling `ChunkFetchReqeust` from 
async to sync, that's causes the shuffle benchmark regression. This PR fixes 
the regression back to the async mode by reusing the config 
`spark.shuffle.server.chunkFetchHandlerThreadsPercent`.
    When the user sets the config, ChunkFetchReqeust will be processed in a 
separate event loop group, otherwise, the code path is exactly the same as 
before.
    
    ### Why are the changes needed?
    Fix the shuffle performance regression described in  
https://github.com/apache/spark/pull/22173#issuecomment-572459561
    
    ### Does this PR introduce any user-facing change?
    Yes, this PR disable the separate event loop for FetchRequest by default.
    
    ### How was this patch tested?
    Existing UT.
    
    Closes #27665 from xuanyuanking/SPARK-24355-follow.
    
    Authored-by: Yuanjian Li <xyliyuanj...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 0fe203e7032a326ff0c78f6660ab1b09409fe09d)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/network/TransportContext.java | 27 +++++++++---------
 .../network/server/ChunkFetchRequestHandler.java   | 33 ++++++++++++++++------
 .../network/server/TransportChannelHandler.java    |  5 +++-
 .../network/server/TransportRequestHandler.java    | 13 +++++++--
 .../apache/spark/network/util/TransportConf.java   | 13 +++++++--
 .../network/ChunkFetchRequestHandlerSuite.java     |  4 +--
 .../network/TransportRequestHandlerSuite.java      |  4 +--
 7 files changed, 66 insertions(+), 33 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
index d99b9bd..a0de9df 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -123,7 +123,7 @@ public class TransportContext implements Closeable {
 
     if (conf.getModuleName() != null &&
         conf.getModuleName().equalsIgnoreCase("shuffle") &&
-        !isClientOnly) {
+        !isClientOnly && conf.separateChunkFetchRequest()) {
       chunkFetchWorkers = NettyUtils.createEventLoop(
           IOMode.valueOf(conf.ioMode()),
           conf.chunkFetchHandlerThreads(),
@@ -187,8 +187,6 @@ public class TransportContext implements Closeable {
       RpcHandler channelRpcHandler) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, 
channelRpcHandler);
-      ChunkFetchRequestHandler chunkFetchHandler =
-        createChunkFetchHandler(channelHandler, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline()
         .addLast("encoder", ENCODER)
         .addLast(TransportFrameDecoder.HANDLER_NAME, 
NettyUtils.createFrameDecoder())
@@ -200,6 +198,9 @@ public class TransportContext implements Closeable {
         .addLast("handler", channelHandler);
       // Use a separate EventLoopGroup to handle ChunkFetchRequest messages 
for shuffle rpcs.
       if (chunkFetchWorkers != null) {
+        ChunkFetchRequestHandler chunkFetchHandler = new 
ChunkFetchRequestHandler(
+          channelHandler.getClient(), rpcHandler.getStreamManager(),
+          conf.maxChunksBeingTransferred(), true /* syncModeEnabled */);
         pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", 
chunkFetchHandler);
       }
       return channelHandler;
@@ -217,19 +218,17 @@ public class TransportContext implements Closeable {
   private TransportChannelHandler createChannelHandler(Channel channel, 
RpcHandler rpcHandler) {
     TransportResponseHandler responseHandler = new 
TransportResponseHandler(channel);
     TransportClient client = new TransportClient(channel, responseHandler);
+    boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
+    ChunkFetchRequestHandler chunkFetchRequestHandler = null;
+    if (!separateChunkFetchRequest) {
+      chunkFetchRequestHandler = new ChunkFetchRequestHandler(
+        client, rpcHandler.getStreamManager(),
+        conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
+    }
     TransportRequestHandler requestHandler = new 
TransportRequestHandler(channel, client,
-      rpcHandler, conf.maxChunksBeingTransferred());
+      rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
     return new TransportChannelHandler(client, responseHandler, requestHandler,
-      conf.connectionTimeoutMs(), closeIdleConnections, this);
-  }
-
-  /**
-   * Creates the dedicated ChannelHandler for ChunkFetchRequest messages.
-   */
-  private ChunkFetchRequestHandler 
createChunkFetchHandler(TransportChannelHandler channelHandler,
-      RpcHandler rpcHandler) {
-    return new ChunkFetchRequestHandler(channelHandler.getClient(),
-      rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred());
+      conf.connectionTimeoutMs(), separateChunkFetchRequest, 
closeIdleConnections, this);
   }
 
   public TransportConf getConf() { return conf; }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
index 94412c4..82810da 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
@@ -55,14 +55,17 @@ public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler<ChunkF
   private final StreamManager streamManager;
   /** The max number of chunks being transferred and not finished yet. */
   private final long maxChunksBeingTransferred;
+  private final boolean syncModeEnabled;
 
   public ChunkFetchRequestHandler(
       TransportClient client,
       StreamManager streamManager,
-      Long maxChunksBeingTransferred) {
+      Long maxChunksBeingTransferred,
+      boolean syncModeEnabled) {
     this.client = client;
     this.streamManager = streamManager;
     this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+    this.syncModeEnabled = syncModeEnabled;
   }
 
   @Override
@@ -76,6 +79,11 @@ public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler<ChunkF
       ChannelHandlerContext ctx,
       final ChunkFetchRequest msg) throws Exception {
     Channel channel = ctx.channel();
+    processFetchRequest(channel, msg);
+  }
+
+  public void processFetchRequest(
+      final Channel channel, final ChunkFetchRequest msg) throws Exception {
     if (logger.isTraceEnabled()) {
       logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
         msg.streamChunkId);
@@ -112,19 +120,26 @@ public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler<ChunkF
    * channel will be handled by the EventLoop the channel is registered to. So 
even
    * though we are processing the ChunkFetchRequest in a separate thread pool, 
the actual I/O,
    * which is the potentially blocking call that could deplete server handler 
threads, is still
-   * being processed by TransportServer's default EventLoopGroup. In order to 
throttle the max
-   * number of threads that channel I/O for sending response to 
ChunkFetchRequest, the thread
-   * calling channel.writeAndFlush will wait for the completion of sending 
response back to
-   * client by invoking await(). This will throttle the rate at which threads 
from
-   * ChunkFetchRequest dedicated EventLoopGroup submit channel I/O requests to 
TransportServer's
-   * default EventLoopGroup, thus making sure that we can reserve some threads 
in
-   * TransportServer's default EventLoopGroup for handling other RPC messages.
+   * being processed by TransportServer's default EventLoopGroup.
+   *
+   * When syncModeEnabled is true, Spark will throttle the max number of 
threads that channel I/O
+   * for sending response to ChunkFetchRequest, the thread calling 
channel.writeAndFlush will wait
+   * for the completion of sending response back to client by invoking 
await(). This will throttle
+   * the rate at which threads from ChunkFetchRequest dedicated EventLoopGroup 
submit channel I/O
+   * requests to TransportServer's default EventLoopGroup, thus making sure 
that we can reserve
+   * some threads in TransportServer's default EventLoopGroup for handling 
other RPC messages.
    */
   private ChannelFuture respond(
       final Channel channel,
       final Encodable result) throws InterruptedException {
     final SocketAddress remoteAddress = channel.remoteAddress();
-    return 
channel.writeAndFlush(result).await().addListener((ChannelFutureListener) 
future -> {
+    ChannelFuture channelFuture;
+    if (syncModeEnabled) {
+      channelFuture = channel.writeAndFlush(result).await();
+    } else {
+      channelFuture = channel.writeAndFlush(result);
+    }
+    return channelFuture.addListener((ChannelFutureListener) future -> {
       if (future.isSuccess()) {
         logger.trace("Sent result {} to client {}", result, remoteAddress);
       } else {
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index 31371f6..e53a0c1 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -58,6 +58,7 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
   private final TransportRequestHandler requestHandler;
   private final long requestTimeoutNs;
   private final boolean closeIdleConnections;
+  private final boolean skipChunkFetchRequest;
   private final TransportContext transportContext;
 
   public TransportChannelHandler(
@@ -65,12 +66,14 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
       TransportResponseHandler responseHandler,
       TransportRequestHandler requestHandler,
       long requestTimeoutMs,
+      boolean skipChunkFetchRequest,
       boolean closeIdleConnections,
       TransportContext transportContext) {
     this.client = client;
     this.responseHandler = responseHandler;
     this.requestHandler = requestHandler;
     this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
+    this.skipChunkFetchRequest = skipChunkFetchRequest;
     this.closeIdleConnections = closeIdleConnections;
     this.transportContext = transportContext;
   }
@@ -124,7 +127,7 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
    */
   @Override
   public boolean acceptInboundMessage(Object msg) throws Exception {
-    if (msg instanceof ChunkFetchRequest) {
+    if (skipChunkFetchRequest && msg instanceof ChunkFetchRequest) {
       return false;
     } else {
       return super.acceptInboundMessage(msg);
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 0792b58..f178928 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -62,16 +62,21 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
   /** The max number of chunks being transferred and not finished yet. */
   private final long maxChunksBeingTransferred;
 
+  /** The dedicated ChannelHandler for ChunkFetchRequest messages. */
+  private final ChunkFetchRequestHandler chunkFetchRequestHandler;
+
   public TransportRequestHandler(
       Channel channel,
       TransportClient reverseClient,
       RpcHandler rpcHandler,
-      Long maxChunksBeingTransferred) {
+      Long maxChunksBeingTransferred,
+      ChunkFetchRequestHandler chunkFetchRequestHandler) {
     this.channel = channel;
     this.reverseClient = reverseClient;
     this.rpcHandler = rpcHandler;
     this.streamManager = rpcHandler.getStreamManager();
     this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+    this.chunkFetchRequestHandler = chunkFetchRequestHandler;
   }
 
   @Override
@@ -97,8 +102,10 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
   }
 
   @Override
-  public void handle(RequestMessage request) {
-    if (request instanceof RpcRequest) {
+  public void handle(RequestMessage request) throws Exception {
+    if (request instanceof ChunkFetchRequest) {
+      chunkFetchRequestHandler.processFetchRequest(channel, 
(ChunkFetchRequest) request);
+    } else if (request instanceof RpcRequest) {
       processRpcRequest((RpcRequest) request);
     } else if (request instanceof OneWayMessage) {
       processOneWayMessage((OneWayMessage) request);
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index cc0f291..6c37f9a 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -316,7 +316,8 @@ public class TransportConf {
 
   /**
    * Percentage of io.serverThreads used by netty to process ChunkFetchRequest.
-   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * When the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is 
set,
+   * shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
    * Although when calling the async writeAndFlush on the underlying channel 
to send
    * response back to client, the I/O on the channel is still being handled by
    * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
@@ -339,13 +340,21 @@ public class TransportConf {
       return 0;
     }
     int chunkFetchHandlerThreadsPercent =
-      conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 100);
+      
Integer.parseInt(conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent"));
     int threads =
       this.serverThreads() > 0 ? this.serverThreads() : 2 * 
NettyRuntime.availableProcessors();
     return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 
100.0));
   }
 
   /**
+   * Whether to use a separate EventLoopGroup to process ChunkFetchRequest 
messages, it is decided
+   * by the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is 
set or not.
+   */
+  public boolean separateChunkFetchRequest() {
+    return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 
0) > 0;
+  }
+
+  /**
    * Whether to use the old protocol while doing the shuffle block fetching.
    * It is only enabled while we need the compatibility in the scenario of new 
spark version
    * job fetching blocks from old version external shuffle service.
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java
index 7e30ed4..addb4ff 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.netty.channel.Channel;
-import org.apache.spark.network.server.ChunkFetchRequestHandler;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -33,6 +32,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.protocol.*;
+import org.apache.spark.network.server.ChunkFetchRequestHandler;
 import org.apache.spark.network.server.NoOpRpcHandler;
 import org.apache.spark.network.server.OneForOneStreamManager;
 import org.apache.spark.network.server.RpcHandler;
@@ -68,7 +68,7 @@ public class ChunkFetchRequestHandlerSuite {
     long streamId = streamManager.registerStream("test-app", 
managedBuffers.iterator(), channel);
     TransportClient reverseClient = mock(TransportClient.class);
     ChunkFetchRequestHandler requestHandler = new 
ChunkFetchRequestHandler(reverseClient,
-      rpcHandler.getStreamManager(), 2L);
+      rpcHandler.getStreamManager(), 2L, false);
 
     RequestMessage request0 = new ChunkFetchRequest(new 
StreamChunkId(streamId, 0));
     requestHandler.channelRead(context, request0);
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
index a43a659..0a64471 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
@@ -39,7 +39,7 @@ import 
org.apache.spark.network.server.TransportRequestHandler;
 public class TransportRequestHandlerSuite {
 
   @Test
-  public void handleStreamRequest() {
+  public void handleStreamRequest() throws Exception {
     RpcHandler rpcHandler = new NoOpRpcHandler();
     OneForOneStreamManager streamManager = (OneForOneStreamManager) 
(rpcHandler.getStreamManager());
     Channel channel = mock(Channel.class);
@@ -66,7 +66,7 @@ public class TransportRequestHandlerSuite {
 
     TransportClient reverseClient = mock(TransportClient.class);
     TransportRequestHandler requestHandler = new 
TransportRequestHandler(channel, reverseClient,
-      rpcHandler, 2L);
+      rpcHandler, 2L, null);
 
     RequestMessage request0 = new StreamRequest(String.format("%d_%d", 
streamId, 0));
     requestHandler.handle(request0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to