Repository: spark
Updated Branches:
  refs/heads/branch-1.2 380eba5f4 -> c7185f0c0


[SPARK-4516] Avoid allocating Netty PooledByteBufAllocators unnecessarily

Turns out we are allocating an allocator pool for every TransportClient (which 
means that the number increases with the number of nodes in the cluster), when 
really we should just reuse one for all clients.

This patch, as expected, greatly decreases off-heap memory allocation, and 
appears to make allocation only proportional to the number of cores.

Author: Aaron Davidson <[email protected]>

Closes #3465 from aarondav/fewer-pools and squashes the following commits:

36c49da [Aaron Davidson] [SPARK-4516] Avoid allocating unnecessarily Netty 
PooledByteBufAllocators

(cherry picked from commit 346bc17a2ec8fc9e6eaff90733aa1e8b6b46883e)
Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7185f0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7185f0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7185f0c

Branch: refs/heads/branch-1.2
Commit: c7185f0c08e2a42e2595466e2d8ac394cbf66f5b
Parents: 380eba5
Author: Aaron Davidson <[email protected]>
Authored: Wed Nov 26 00:32:45 2014 -0500
Committer: Patrick Wendell <[email protected]>
Committed: Wed Nov 26 00:32:55 2014 -0500

----------------------------------------------------------------------
 .../spark/network/client/TransportClientFactory.java    | 12 +++++-------
 .../java/org/apache/spark/network/util/NettyUtils.java  |  6 +++---
 2 files changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c7185f0c/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 76bce85..9afd5de 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -19,7 +19,6 @@ package org.apache.spark.network.client;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.List;
@@ -37,7 +36,6 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.util.internal.PlatformDependent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +65,7 @@ public class TransportClientFactory implements Closeable {
 
   private final Class<? extends Channel> socketChannelClass;
   private EventLoopGroup workerGroup;
+  private PooledByteBufAllocator pooledAllocator;
 
   public TransportClientFactory(
       TransportContext context,
@@ -80,6 +79,8 @@ public class TransportClientFactory implements Closeable {
     this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
     // TODO: Make thread pool name configurable.
     this.workerGroup = NettyUtils.createEventLoop(ioMode, 
conf.clientThreads(), "shuffle-client");
+    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
+      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
   }
 
   /**
@@ -115,11 +116,8 @@ public class TransportClientFactory implements Closeable {
        // Disable Nagle's Algorithm since we don't want packets to wait
       .option(ChannelOption.TCP_NODELAY, true)
       .option(ChannelOption.SO_KEEPALIVE, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
conf.connectionTimeoutMs());
-
-    // Use pooled buffers to reduce temporary buffer allocation
-    bootstrap.option(ChannelOption.ALLOCATOR, 
NettyUtils.createPooledByteBufAllocator(
-      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
+      .option(ChannelOption.ALLOCATOR, pooledAllocator);
 
     final AtomicReference<TransportClient> clientRef = new 
AtomicReference<TransportClient>();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c7185f0c/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java 
b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 5c654a6..b3991a6 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -109,9 +109,9 @@ public class NettyUtils {
 
   /**
    * Create a pooled ByteBuf allocator but disables the thread-local cache. 
Thread-local caches
-   * are disabled because the ByteBufs are allocated by the event loop thread, 
but released by the
-   * executor thread rather than the event loop thread. Those thread-local 
caches actually delay
-   * the recycling of buffers, leading to larger memory usage.
+   * are disabled for TransportClients because the ByteBufs are allocated by 
the event loop thread,
+   * but released by the executor thread rather than the event loop thread. 
Those thread-local
+   * caches actually delay the recycling of buffers, leading to larger memory 
usage.
    */
   public static PooledByteBufAllocator createPooledByteBufAllocator(
       boolean allowDirectBufs,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to