Repository: spark Updated Branches: refs/heads/master 23f5bdf06 -> b9e1c2eb9
[SPARK-4370] [Core] Limit number of Netty cores based on executor size Author: Aaron Davidson <aa...@databricks.com> Closes #3155 from aarondav/conf and squashes the following commits: 7045e77 [Aaron Davidson] Add mesos comment 4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based on executor size Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9e1c2eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9e1c2eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9e1c2eb Branch: refs/heads/master Commit: b9e1c2eb9b6f7fb609718ef20048a8da452d881b Parents: 23f5bdf Author: Aaron Davidson <aa...@databricks.com> Authored: Wed Nov 12 18:46:37 2014 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Nov 12 18:46:37 2014 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkEnv.scala | 12 ++++-- .../worker/StandaloneWorkerShuffleService.scala | 2 +- .../executor/CoarseGrainedExecutorBackend.scala | 4 +- .../org/apache/spark/executor/Executor.scala | 3 +- .../spark/executor/MesosExecutorBackend.scala | 17 +++++++-- .../netty/NettyBlockTransferService.scala | 4 +- .../network/netty/SparkTransportConf.scala | 19 ++++++++-- .../spark/scheduler/local/LocalBackend.scala | 2 +- .../org/apache/spark/storage/BlockManager.scala | 12 +++--- .../spark/ExternalShuffleServiceSuite.scala | 2 +- .../netty/NettyBlockTransferSecuritySuite.scala | 4 +- .../storage/BlockManagerReplicationSuite.scala | 4 +- .../spark/storage/BlockManagerSuite.scala | 5 ++- .../network/client/TransportClientFactory.java | 33 +---------------- .../spark/network/server/TransportServer.java | 4 +- .../apache/spark/network/util/NettyUtils.java | 39 ++++++++++++++++++++ .../streaming/ReceivedBlockHandlerSuite.scala | 2 +- 17 files changed, 104 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index e7454be..e464b32 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -168,9 +168,11 @@ object SparkEnv extends Logging { executorId: String, hostname: String, port: Int, + numCores: Int, isLocal: Boolean, actorSystem: ActorSystem = null): SparkEnv = { - create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem) + create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem, + numUsableCores = numCores) } /** @@ -184,7 +186,8 @@ object SparkEnv extends Logging { isDriver: Boolean, isLocal: Boolean, listenerBus: LiveListenerBus = null, - defaultActorSystem: ActorSystem = null): SparkEnv = { + defaultActorSystem: ActorSystem = null, + numUsableCores: Int = 0): SparkEnv = { // Listener bus is only used on the driver if (isDriver) { @@ -276,7 +279,7 @@ object SparkEnv extends Logging { val blockTransferService = conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match { case "netty" => - new NettyBlockTransferService(conf, securityManager) + new NettyBlockTransferService(conf, securityManager, numUsableCores) case "nio" => new NioBlockTransferService(conf, securityManager) } @@ -287,7 +290,8 @@ object SparkEnv extends Logging { // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, - serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager) + serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, + numUsableCores) val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index d044e1d..b979896 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) private val useSasl: Boolean = securityManager.isAuthenticationEnabled() - private val transportConf = SparkTransportConf.fromSparkConf(sparkConf) + private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0) private val blockHandler = new ExternalShuffleBlockHandler(transportConf) private val transportContext: TransportContext = { val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3711824..5f46f3b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend( override def receiveWithLogging = { case RegisteredExecutor => logInfo("Successfully registered with driver") - // Make this host instead of hostPort ? val (hostname, _) = Utils.parseHostPort(hostPort) - executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem) + executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false, + actorSystem) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index caf4d76..4c378a2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -43,6 +43,7 @@ private[spark] class Executor( executorId: String, slaveHostname: String, properties: Seq[(String, String)], + numCores: Int, isLocal: Boolean = false, actorSystem: ActorSystem = null) extends Logging @@ -83,7 +84,7 @@ private[spark] class Executor( if (!isLocal) { val port = conf.getInt("spark.executor.port", 0) val _env = SparkEnv.createExecutorEnv( - conf, executorId, slaveHostname, port, isLocal, actorSystem) + conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) _env.blockManager.initialize(conf.getAppId) http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index bca0b15..f15e6bc 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -19,6 +19,8 @@ package org.apache.spark.executor import java.nio.ByteBuffer +import scala.collection.JavaConversions._ + import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} @@ -50,14 +52,23 @@ private[spark] class MesosExecutorBackend executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo) { - logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) + + // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend. + val cpusPerTask = executorInfo.getResourcesList + .find(_.getName == "cpus") + .map(_.getScalar.getValue.toInt) + .getOrElse(0) + val executorId = executorInfo.getExecutorId.getValue + + logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus") this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) executor = new Executor( - executorInfo.getExecutorId.getValue, + executorId, slaveInfo.getHostname, - properties) + properties, + cpusPerTask) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index f8a7f64..0027cbb 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -35,13 +35,13 @@ import org.apache.spark.util.Utils /** * A BlockTransferService that uses Netty to fetch a set of blocks at at time. */ -class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager) +class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int) extends BlockTransferService { // TODO: Don't use Java serialization, use a more cross-version compatible serialization format. private val serializer = new JavaSerializer(conf) private val authEnabled = securityManager.isAuthenticationEnabled() - private val transportConf = SparkTransportConf.fromSparkConf(conf) + private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores) private[this] var transportContext: TransportContext = _ private[this] var server: TransportServer = _ http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index 9fa4fa7..ce4225c 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -20,11 +20,22 @@ package org.apache.spark.network.netty import org.apache.spark.SparkConf import org.apache.spark.network.util.{TransportConf, ConfigProvider} -/** - * Utility for creating a [[TransportConf]] from a [[SparkConf]]. - */ object SparkTransportConf { - def fromSparkConf(conf: SparkConf): TransportConf = { + /** + * Utility for creating a [[TransportConf]] from a [[SparkConf]]. + * @param numUsableCores if nonzero, this will restrict the server and client threads to only + * use the given number of cores, rather than all of the machine's cores. + * This restriction will only occur if these properties are not already set. + */ + def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = { + val conf = _conf.clone + if (numUsableCores > 0) { + // Only set if serverThreads/clientThreads not already set. + conf.set("spark.shuffle.io.serverThreads", + conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString)) + conf.set("spark.shuffle.io.clientThreads", + conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString)) + } new TransportConf(new ConfigProvider { override def get(name: String): String = conf.get(name) }) http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index c026483..a2f1f14 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -51,7 +51,7 @@ private[spark] class LocalActor( private val localExecutorHostname = "localhost" val executor = new Executor( - localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true) + localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true) override def receiveWithLogging = { case ReviveOffers => http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 39434f4..308c59e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -73,7 +73,8 @@ private[spark] class BlockManager( mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, blockTransferService: BlockTransferService, - securityManager: SecurityManager) + securityManager: SecurityManager, + numUsableCores: Int) extends BlockDataManager with Logging { val diskBlockManager = new DiskBlockManager(this, conf) @@ -121,8 +122,8 @@ private[spark] class BlockManager( // Client to read other executors' shuffle files. This is either an external service, or just the // standard BlockTranserService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { - new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager, - securityManager.isAuthenticationEnabled()) + val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores) + new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) } else { blockTransferService } @@ -174,9 +175,10 @@ private[spark] class BlockManager( mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, blockTransferService: BlockTransferService, - securityManager: SecurityManager) = { + securityManager: SecurityManager, + numUsableCores: Int) = { this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), - conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager) + conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 9623d66..55799f5 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { var rpcHandler: ExternalShuffleBlockHandler = _ override def beforeAll() { - val transportConf = SparkTransportConf.fromSparkConf(conf) + val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2) rpcHandler = new ExternalShuffleBlockHandler(transportConf) val transportContext = new TransportContext(transportConf, rpcHandler) server = transportContext.createServer() http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 530f5d6..94bfa67 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -104,11 +104,11 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer) val securityManager0 = new SecurityManager(conf0) - val exec0 = new NettyBlockTransferService(conf0, securityManager0) + val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1) exec0.init(blockManager) val securityManager1 = new SecurityManager(conf1) - val exec1 = new NettyBlockTransferService(conf1, securityManager1) + val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1) exec1.init(blockManager) val result = fetchBlock(exec0, exec1, "1", blockId) match { http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index f63e772..c2903c8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NioBlockTransferService(conf, securityMgr) val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr) + mapOutputTracker, shuffleManager, transfer, securityMgr, 0) store.initialize("app-id") allStores += store store @@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd when(failableTransfer.hostName).thenReturn("some-hostname") when(failableTransfer.port).thenReturn(1000) val failableStore = new BlockManager("failable-store", actorSystem, master, serializer, - 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr) + 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) failableStore.initialize("app-id") allStores += failableStore // so that this gets stopped after test assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId)) http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9529502..5554efb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NioBlockTransferService(conf, securityMgr) val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr) + mapOutputTracker, shuffleManager, transfer, securityMgr, 0) manager.initialize("app-id") manager } @@ -795,7 +795,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // Use Java serializer so we can create an unserializable error. val transfer = new NioBlockTransferService(conf, securityMgr) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master, - new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr) + new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr, + 0) // The put should fail since a1 is not serializable. class UnserializableClass http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 397d3a8..76bce85 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -118,7 +118,8 @@ public class TransportClientFactory implements Closeable { .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()); // Use pooled buffers to reduce temporary buffer allocation - bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator()); + bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator( + conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads())); final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>(); @@ -190,34 +191,4 @@ public class TransportClientFactory implements Closeable { workerGroup = null; } } - - /** - * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches - * are disabled because the ByteBufs are allocated by the event loop thread, but released by the - * executor thread rather than the event loop thread. Those thread-local caches actually delay - * the recycling of buffers, leading to larger memory usage. - */ - private PooledByteBufAllocator createPooledByteBufAllocator() { - return new PooledByteBufAllocator( - conf.preferDirectBufs() && PlatformDependent.directBufferPreferred(), - getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), - getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), - getPrivateStaticField("DEFAULT_PAGE_SIZE"), - getPrivateStaticField("DEFAULT_MAX_ORDER"), - 0, // tinyCacheSize - 0, // smallCacheSize - 0 // normalCacheSize - ); - } - - /** Used to get defaults from Netty's private static fields. */ - private int getPrivateStaticField(String name) { - try { - Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); - f.setAccessible(true); - return f.getInt(null); - } catch (Exception e) { - throw new RuntimeException(e); - } - } } http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 579676c..625c325 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -72,8 +72,8 @@ public class TransportServer implements Closeable { NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); EventLoopGroup workerGroup = bossGroup; - PooledByteBufAllocator allocator = new PooledByteBufAllocator( - conf.preferDirectBufs() && PlatformDependent.directBufferPreferred()); + PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( + conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 2a7664f..5c654a6 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -17,9 +17,11 @@ package org.apache.spark.network.util; +import java.lang.reflect.Field; import java.util.concurrent.ThreadFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; @@ -32,6 +34,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.internal.PlatformDependent; /** * Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO. @@ -103,4 +106,40 @@ public class NettyUtils { } return "<unknown remote>"; } + + /** + * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches + * are disabled because the ByteBufs are allocated by the event loop thread, but released by the + * executor thread rather than the event loop thread. Those thread-local caches actually delay + * the recycling of buffers, leading to larger memory usage. + */ + public static PooledByteBufAllocator createPooledByteBufAllocator( + boolean allowDirectBufs, + boolean allowCache, + int numCores) { + if (numCores == 0) { + numCores = Runtime.getRuntime().availableProcessors(); + } + return new PooledByteBufAllocator( + allowDirectBufs && PlatformDependent.directBufferPreferred(), + Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores), + Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0), + getPrivateStaticField("DEFAULT_PAGE_SIZE"), + getPrivateStaticField("DEFAULT_MAX_ORDER"), + allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0 + ); + } + + /** Used to get defaults from Netty's private static fields. */ + private static int getPrivateStaticField(String name) { + try { + Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.getInt(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 9efe15d..3661e16 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer, blockManagerSize, conf, mapOutputTracker, shuffleManager, - new NioBlockTransferService(conf, securityMgr), securityMgr) + new NioBlockTransferService(conf, securityMgr), securityMgr, 0) blockManager.initialize("app-id") tempDirectory = Files.createTempDir() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org