Repository: spark Updated Branches: refs/heads/master b5fb1410c -> f5f2d2738
[SPARK-4516] Cap default number of Netty threads at 8 In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes at a premium. Thus, this value should still retain maximum throughput and reduce wasted off-heap memory allocation. It can be overridden by setting the number of serverThreads and clientThreads manually in Spark's configuration. Author: Aaron Davidson <aa...@databricks.com> Closes #3469 from aarondav/fewer-pools2 and squashes the following commits: 087c59f [Aaron Davidson] [SPARK-4516] Cap default number of Netty threads at 8 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5f2d273 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5f2d273 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5f2d273 Branch: refs/heads/master Commit: f5f2d27385c243959f03a9d78a149d5f405b2f50 Parents: b5fb141 Author: Aaron Davidson <aa...@databricks.com> Authored: Tue Nov 25 23:57:04 2014 -0500 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Tue Nov 25 23:57:04 2014 -0500 ---------------------------------------------------------------------- .../network/netty/SparkTransportConf.scala | 44 ++++++++++++++++---- 1 file changed, 37 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f5f2d273/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index ce4225c..cef2030 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -20,8 +20,25 @@ package org.apache.spark.network.netty import org.apache.spark.SparkConf import org.apache.spark.network.util.{TransportConf, ConfigProvider} +/** + * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor, + * Driver, or a standalone shuffle service) into a TransportConf with details on our environment + * like the number of cores that are allocated to this JVM. + */ object SparkTransportConf { /** + * Specifies an upper bound on the number of Netty threads that Spark requires by default. + * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core + * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes + * at a premium. + * + * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory + * allocation. It can be overridden by setting the number of serverThreads and clientThreads + * manually in Spark's configuration. + */ + private val MAX_DEFAULT_NETTY_THREADS = 8 + + /** * Utility for creating a [[TransportConf]] from a [[SparkConf]]. * @param numUsableCores if nonzero, this will restrict the server and client threads to only * use the given number of cores, rather than all of the machine's cores. @@ -29,15 +46,28 @@ object SparkTransportConf { */ def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = { val conf = _conf.clone - if (numUsableCores > 0) { - // Only set if serverThreads/clientThreads not already set. - conf.set("spark.shuffle.io.serverThreads", - conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString)) - conf.set("spark.shuffle.io.clientThreads", - conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString)) - } + + // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily + // assuming we have all the machine's cores). + // NB: Only set if serverThreads/clientThreads not already set. + val numThreads = defaultNumThreads(numUsableCores) + conf.set("spark.shuffle.io.serverThreads", + conf.get("spark.shuffle.io.serverThreads", numThreads.toString)) + conf.set("spark.shuffle.io.clientThreads", + conf.get("spark.shuffle.io.clientThreads", numThreads.toString)) + new TransportConf(new ConfigProvider { override def get(name: String): String = conf.get(name) }) } + + /** + * Returns the default number of threads for both the Netty client and server thread pools. + * If numUsableCores is 0, we will use Runtime get an approximate number of available cores. + */ + private def defaultNumThreads(numUsableCores: Int): Int = { + val availableCores = + if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() + math.min(availableCores, MAX_DEFAULT_NETTY_THREADS) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org