Repository: spark
Updated Branches:
  refs/heads/master 23f5bdf06 -> b9e1c2eb9


[SPARK-4370] [Core] Limit number of Netty cores based on executor size

Author: Aaron Davidson <aa...@databricks.com>

Closes #3155 from aarondav/conf and squashes the following commits:

7045e77 [Aaron Davidson] Add mesos comment
4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based 
on executor size


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9e1c2eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9e1c2eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9e1c2eb

Branch: refs/heads/master
Commit: b9e1c2eb9b6f7fb609718ef20048a8da452d881b
Parents: 23f5bdf
Author: Aaron Davidson <aa...@databricks.com>
Authored: Wed Nov 12 18:46:37 2014 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Nov 12 18:46:37 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala  | 12 ++++--
 .../worker/StandaloneWorkerShuffleService.scala |  2 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  4 +-
 .../org/apache/spark/executor/Executor.scala    |  3 +-
 .../spark/executor/MesosExecutorBackend.scala   | 17 +++++++--
 .../netty/NettyBlockTransferService.scala       |  4 +-
 .../network/netty/SparkTransportConf.scala      | 19 ++++++++--
 .../spark/scheduler/local/LocalBackend.scala    |  2 +-
 .../org/apache/spark/storage/BlockManager.scala | 12 +++---
 .../spark/ExternalShuffleServiceSuite.scala     |  2 +-
 .../netty/NettyBlockTransferSecuritySuite.scala |  4 +-
 .../storage/BlockManagerReplicationSuite.scala  |  4 +-
 .../spark/storage/BlockManagerSuite.scala       |  5 ++-
 .../network/client/TransportClientFactory.java  | 33 +----------------
 .../spark/network/server/TransportServer.java   |  4 +-
 .../apache/spark/network/util/NettyUtils.java   | 39 ++++++++++++++++++++
 .../streaming/ReceivedBlockHandlerSuite.scala   |  2 +-
 17 files changed, 104 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index e7454be..e464b32 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -168,9 +168,11 @@ object SparkEnv extends Logging {
       executorId: String,
       hostname: String,
       port: Int,
+      numCores: Int,
       isLocal: Boolean,
       actorSystem: ActorSystem = null): SparkEnv = {
-    create(conf, executorId, hostname, port, false, isLocal, 
defaultActorSystem = actorSystem)
+    create(conf, executorId, hostname, port, false, isLocal, 
defaultActorSystem = actorSystem,
+      numUsableCores = numCores)
   }
 
   /**
@@ -184,7 +186,8 @@ object SparkEnv extends Logging {
       isDriver: Boolean,
       isLocal: Boolean,
       listenerBus: LiveListenerBus = null,
-      defaultActorSystem: ActorSystem = null): SparkEnv = {
+      defaultActorSystem: ActorSystem = null,
+      numUsableCores: Int = 0): SparkEnv = {
 
     // Listener bus is only used on the driver
     if (isDriver) {
@@ -276,7 +279,7 @@ object SparkEnv extends Logging {
     val blockTransferService =
       conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase 
match {
         case "netty" =>
-          new NettyBlockTransferService(conf, securityManager)
+          new NettyBlockTransferService(conf, securityManager, numUsableCores)
         case "nio" =>
           new NioBlockTransferService(conf, securityManager)
       }
@@ -287,7 +290,8 @@ object SparkEnv extends Logging {
 
     // NB: blockManager is not valid until initialize() is called later.
     val blockManager = new BlockManager(executorId, actorSystem, 
blockManagerMaster,
-      serializer, conf, mapOutputTracker, shuffleManager, 
blockTransferService, securityManager)
+      serializer, conf, mapOutputTracker, shuffleManager, 
blockTransferService, securityManager,
+      numUsableCores)
 
     val broadcastManager = new BroadcastManager(isDriver, conf, 
securityManager)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
 
b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
index d044e1d..b979896 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
@@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, 
securityManager: Secu
   private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
   private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
 
-  private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
+  private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, 
numUsableCores = 0)
   private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
   private val transportContext: TransportContext = {
     val handler = if (useSasl) new SaslRpcHandler(blockHandler, 
securityManager) else blockHandler

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 3711824..5f46f3b 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend(
   override def receiveWithLogging = {
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
-      // Make this host instead of hostPort ?
       val (hostname, _) = Utils.parseHostPort(hostPort)
-      executor = new Executor(executorId, hostname, sparkProperties, isLocal = 
false, actorSystem)
+      executor = new Executor(executorId, hostname, sparkProperties, cores, 
isLocal = false,
+        actorSystem)
 
     case RegisterExecutorFailed(message) =>
       logError("Slave registration failed: " + message)

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index caf4d76..4c378a2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -43,6 +43,7 @@ private[spark] class Executor(
     executorId: String,
     slaveHostname: String,
     properties: Seq[(String, String)],
+    numCores: Int,
     isLocal: Boolean = false,
     actorSystem: ActorSystem = null)
   extends Logging
@@ -83,7 +84,7 @@ private[spark] class Executor(
     if (!isLocal) {
       val port = conf.getInt("spark.executor.port", 0)
       val _env = SparkEnv.createExecutorEnv(
-        conf, executorId, slaveHostname, port, isLocal, actorSystem)
+        conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem)
       SparkEnv.set(_env)
       _env.metricsSystem.registerSource(executorSource)
       _env.blockManager.initialize(conf.getAppId)

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala 
b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index bca0b15..f15e6bc 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.executor
 
 import java.nio.ByteBuffer
 
+import scala.collection.JavaConversions._
+
 import org.apache.mesos.protobuf.ByteString
 import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, 
MesosExecutorDriver, MesosNativeLibrary}
 import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
@@ -50,14 +52,23 @@ private[spark] class MesosExecutorBackend
       executorInfo: ExecutorInfo,
       frameworkInfo: FrameworkInfo,
       slaveInfo: SlaveInfo) {
-    logInfo("Registered with Mesos as executor ID " + 
executorInfo.getExecutorId.getValue)
+
+    // Get num cores for this task from ExecutorInfo, created in 
MesosSchedulerBackend.
+    val cpusPerTask = executorInfo.getResourcesList
+      .find(_.getName == "cpus")
+      .map(_.getScalar.getValue.toInt)
+      .getOrElse(0)
+    val executorId = executorInfo.getExecutorId.getValue
+
+    logInfo(s"Registered with Mesos as executor ID $executorId with 
$cpusPerTask cpus")
     this.driver = driver
     val properties = Utils.deserialize[Array[(String, 
String)]](executorInfo.getData.toByteArray) ++
       Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
     executor = new Executor(
-      executorInfo.getExecutorId.getValue,
+      executorId,
       slaveInfo.getHostname,
-      properties)
+      properties,
+      cpusPerTask)
   }
 
   override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index f8a7f64..0027cbb 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -35,13 +35,13 @@ import org.apache.spark.util.Utils
 /**
  * A BlockTransferService that uses Netty to fetch a set of blocks at at time.
  */
-class NettyBlockTransferService(conf: SparkConf, securityManager: 
SecurityManager)
+class NettyBlockTransferService(conf: SparkConf, securityManager: 
SecurityManager, numCores: Int)
   extends BlockTransferService {
 
   // TODO: Don't use Java serialization, use a more cross-version compatible 
serialization format.
   private val serializer = new JavaSerializer(conf)
   private val authEnabled = securityManager.isAuthenticationEnabled()
-  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
 
   private[this] var transportContext: TransportContext = _
   private[this] var server: TransportServer = _

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala 
b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index 9fa4fa7..ce4225c 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -20,11 +20,22 @@ package org.apache.spark.network.netty
 import org.apache.spark.SparkConf
 import org.apache.spark.network.util.{TransportConf, ConfigProvider}
 
-/**
- * Utility for creating a [[TransportConf]] from a [[SparkConf]].
- */
 object SparkTransportConf {
-  def fromSparkConf(conf: SparkConf): TransportConf = {
+  /**
+   * Utility for creating a [[TransportConf]] from a [[SparkConf]].
+   * @param numUsableCores if nonzero, this will restrict the server and 
client threads to only
+   *                       use the given number of cores, rather than all of 
the machine's cores.
+   *                       This restriction will only occur if these 
properties are not already set.
+   */
+  def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf 
= {
+    val conf = _conf.clone
+    if (numUsableCores > 0) {
+      // Only set if serverThreads/clientThreads not already set.
+      conf.set("spark.shuffle.io.serverThreads",
+        conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
+      conf.set("spark.shuffle.io.clientThreads",
+        conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
+    }
     new TransportConf(new ConfigProvider {
       override def get(name: String): String = conf.get(name)
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index c026483..a2f1f14 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -51,7 +51,7 @@ private[spark] class LocalActor(
   private val localExecutorHostname = "localhost"
 
   val executor = new Executor(
-    localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = 
true)
+    localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, 
isLocal = true)
 
   override def receiveWithLogging = {
     case ReviveOffers =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 39434f4..308c59e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -73,7 +73,8 @@ private[spark] class BlockManager(
     mapOutputTracker: MapOutputTracker,
     shuffleManager: ShuffleManager,
     blockTransferService: BlockTransferService,
-    securityManager: SecurityManager)
+    securityManager: SecurityManager,
+    numUsableCores: Int)
   extends BlockDataManager with Logging {
 
   val diskBlockManager = new DiskBlockManager(this, conf)
@@ -121,8 +122,8 @@ private[spark] class BlockManager(
   // Client to read other executors' shuffle files. This is either an external 
service, or just the
   // standard BlockTranserService to directly connect to other Executors.
   private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
-    new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), 
securityManager,
-      securityManager.isAuthenticationEnabled())
+    val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
+    new ExternalShuffleClient(transConf, securityManager, 
securityManager.isAuthenticationEnabled())
   } else {
     blockTransferService
   }
@@ -174,9 +175,10 @@ private[spark] class BlockManager(
       mapOutputTracker: MapOutputTracker,
       shuffleManager: ShuffleManager,
       blockTransferService: BlockTransferService,
-      securityManager: SecurityManager) = {
+      securityManager: SecurityManager,
+      numUsableCores: Int) = {
     this(execId, actorSystem, master, serializer, 
BlockManager.getMaxMemory(conf),
-      conf, mapOutputTracker, shuffleManager, blockTransferService, 
securityManager)
+      conf, mapOutputTracker, shuffleManager, blockTransferService, 
securityManager, numUsableCores)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 9623d66..55799f5 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll {
   var rpcHandler: ExternalShuffleBlockHandler = _
 
   override def beforeAll() {
-    val transportConf = SparkTransportConf.fromSparkConf(conf)
+    val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores 
= 2)
     rpcHandler = new ExternalShuffleBlockHandler(transportConf)
     val transportContext = new TransportContext(transportConf, rpcHandler)
     server = transportContext.createServer()

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 530f5d6..94bfa67 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -104,11 +104,11 @@ class NettyBlockTransferSecuritySuite extends FunSuite 
with MockitoSugar with Sh
     when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
 
     val securityManager0 = new SecurityManager(conf0)
-    val exec0 = new NettyBlockTransferService(conf0, securityManager0)
+    val exec0 = new NettyBlockTransferService(conf0, securityManager0, 
numCores = 1)
     exec0.init(blockManager)
 
     val securityManager1 = new SecurityManager(conf1)
-    val exec1 = new NettyBlockTransferService(conf1, securityManager1)
+    val exec1 = new NettyBlockTransferService(conf1, securityManager1, 
numCores = 1)
     exec1.init(blockManager)
 
     val result = fetchBlock(exec0, exec1, "1", blockId) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index f63e772..c2903c8 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with 
Matchers with BeforeAnd
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val transfer = new NioBlockTransferService(conf, securityMgr)
     val store = new BlockManager(name, actorSystem, master, serializer, 
maxMem, conf,
-      mapOutputTracker, shuffleManager, transfer, securityMgr)
+      mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
     store.initialize("app-id")
     allStores += store
     store
@@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with 
Matchers with BeforeAnd
     when(failableTransfer.hostName).thenReturn("some-hostname")
     when(failableTransfer.port).thenReturn(1000)
     val failableStore = new BlockManager("failable-store", actorSystem, 
master, serializer,
-      10000, conf, mapOutputTracker, shuffleManager, failableTransfer, 
securityMgr)
+      10000, conf, mapOutputTracker, shuffleManager, failableTransfer, 
securityMgr, 0)
     failableStore.initialize("app-id")
     allStores += failableStore // so that this gets stopped after test
     assert(master.getPeers(store.blockManagerId).toSet === 
Set(failableStore.blockManagerId))

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9529502..5554efb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val transfer = new NioBlockTransferService(conf, securityMgr)
     val manager = new BlockManager(name, actorSystem, master, serializer, 
maxMem, conf,
-      mapOutputTracker, shuffleManager, transfer, securityMgr)
+      mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
     manager.initialize("app-id")
     manager
   }
@@ -795,7 +795,8 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
     // Use Java serializer so we can create an unserializable error.
     val transfer = new NioBlockTransferService(conf, securityMgr)
     store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, 
master,
-      new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, 
transfer, securityMgr)
+      new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, 
transfer, securityMgr,
+      0)
 
     // The put should fail since a1 is not serializable.
     class UnserializableClass

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 397d3a8..76bce85 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -118,7 +118,8 @@ public class TransportClientFactory implements Closeable {
       .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
conf.connectionTimeoutMs());
 
     // Use pooled buffers to reduce temporary buffer allocation
-    bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator());
+    bootstrap.option(ChannelOption.ALLOCATOR, 
NettyUtils.createPooledByteBufAllocator(
+      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
 
     final AtomicReference<TransportClient> clientRef = new 
AtomicReference<TransportClient>();
 
@@ -190,34 +191,4 @@ public class TransportClientFactory implements Closeable {
       workerGroup = null;
     }
   }
-
-  /**
-   * Create a pooled ByteBuf allocator but disables the thread-local cache. 
Thread-local caches
-   * are disabled because the ByteBufs are allocated by the event loop thread, 
but released by the
-   * executor thread rather than the event loop thread. Those thread-local 
caches actually delay
-   * the recycling of buffers, leading to larger memory usage.
-   */
-  private PooledByteBufAllocator createPooledByteBufAllocator() {
-    return new PooledByteBufAllocator(
-        conf.preferDirectBufs() && PlatformDependent.directBufferPreferred(),
-        getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"),
-        getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"),
-        getPrivateStaticField("DEFAULT_PAGE_SIZE"),
-        getPrivateStaticField("DEFAULT_MAX_ORDER"),
-        0,  // tinyCacheSize
-        0,  // smallCacheSize
-        0   // normalCacheSize
-    );
-  }
-
-  /** Used to get defaults from Netty's private static fields. */
-  private int getPrivateStaticField(String name) {
-    try {
-      Field f = 
PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
-      f.setAccessible(true);
-      return f.getInt(null);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
 
b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 579676c..625c325 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -72,8 +72,8 @@ public class TransportServer implements Closeable {
       NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
"shuffle-server");
     EventLoopGroup workerGroup = bossGroup;
 
-    PooledByteBufAllocator allocator = new PooledByteBufAllocator(
-      conf.preferDirectBufs() && PlatformDependent.directBufferPreferred());
+    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
+      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
 
     bootstrap = new ServerBootstrap()
       .group(bossGroup, workerGroup)

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java 
b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 2a7664f..5c654a6 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -17,9 +17,11 @@
 
 package org.apache.spark.network.util;
 
+import java.lang.reflect.Field;
 import java.util.concurrent.ThreadFactory;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.ServerChannel;
@@ -32,6 +34,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.internal.PlatformDependent;
 
 /**
  * Utilities for creating various Netty constructs based on whether we're 
using EPOLL or NIO.
@@ -103,4 +106,40 @@ public class NettyUtils {
     }
     return "<unknown remote>";
   }
+
+  /**
+   * Create a pooled ByteBuf allocator but disables the thread-local cache. 
Thread-local caches
+   * are disabled because the ByteBufs are allocated by the event loop thread, 
but released by the
+   * executor thread rather than the event loop thread. Those thread-local 
caches actually delay
+   * the recycling of buffers, leading to larger memory usage.
+   */
+  public static PooledByteBufAllocator createPooledByteBufAllocator(
+      boolean allowDirectBufs,
+      boolean allowCache,
+      int numCores) {
+    if (numCores == 0) {
+      numCores = Runtime.getRuntime().availableProcessors();
+    }
+    return new PooledByteBufAllocator(
+      allowDirectBufs && PlatformDependent.directBufferPreferred(),
+      Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
+      Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), 
allowDirectBufs ? numCores : 0),
+      getPrivateStaticField("DEFAULT_PAGE_SIZE"),
+      getPrivateStaticField("DEFAULT_MAX_ORDER"),
+      allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
+      allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
+      allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
+    );
+  }
+
+  /** Used to get defaults from Netty's private static fields. */
+  private static int getPrivateStaticField(String name) {
+    try {
+      Field f = 
PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
+      f.setAccessible(true);
+      return f.getInt(null);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 9efe15d..3661e16 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with 
BeforeAndAfter with Matche
 
     blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, 
serializer,
       blockManagerSize, conf, mapOutputTracker, shuffleManager,
-      new NioBlockTransferService(conf, securityMgr), securityMgr)
+      new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
     blockManager.initialize("app-id")
 
     tempDirectory = Files.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to