[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213882447
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -62,14 +63,20 @@ private[spark] object PythonEvalType {
  */
 private[spark] abstract class BasePythonRunner[IN, OUT](
 funcs: Seq[ChainedPythonFunctions],
-bufferSize: Int,
-reuseWorker: Boolean,
 evalType: Int,
 argOffsets: Array[Array[Int]])
   extends Logging {
 
   require(funcs.length == argOffsets.length, "argOffsets should have the 
same length as funcs")
 
+  private val conf = SparkEnv.get.conf
+  private val bufferSize = conf.getInt("spark.buffer.size", 65536)
+  private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
+  // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
+  // number of concurrent tasks, which is determined by the number of 
cores in this executor.
+  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  .map(_ / conf.getInt("spark.executor.cores", 1))
--- End diff --

Oh, it's fine. I meant to fix them together if there are more changes to 
push. Not a big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213777162
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -62,14 +63,20 @@ private[spark] object PythonEvalType {
  */
 private[spark] abstract class BasePythonRunner[IN, OUT](
 funcs: Seq[ChainedPythonFunctions],
-bufferSize: Int,
-reuseWorker: Boolean,
 evalType: Int,
 argOffsets: Array[Array[Int]])
   extends Logging {
 
   require(funcs.length == argOffsets.length, "argOffsets should have the 
same length as funcs")
 
+  private val conf = SparkEnv.get.conf
+  private val bufferSize = conf.getInt("spark.buffer.size", 65536)
+  private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
+  // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
+  // number of concurrent tasks, which is determined by the number of 
cores in this executor.
+  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  .map(_ / conf.getInt("spark.executor.cores", 1))
--- End diff --

@HyukjinKwon, sorry but it looks like this was merged before I could push a 
commit to update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21977


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213407352
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -62,14 +63,20 @@ private[spark] object PythonEvalType {
  */
 private[spark] abstract class BasePythonRunner[IN, OUT](
 funcs: Seq[ChainedPythonFunctions],
-bufferSize: Int,
-reuseWorker: Boolean,
 evalType: Int,
 argOffsets: Array[Array[Int]])
   extends Logging {
 
   require(funcs.length == argOffsets.length, "argOffsets should have the 
same length as funcs")
 
+  private val conf = SparkEnv.get.conf
+  private val bufferSize = conf.getInt("spark.buffer.size", 65536)
+  private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
+  // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
+  // number of concurrent tasks, which is determined by the number of 
cores in this executor.
+  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  .map(_ / conf.getInt("spark.executor.cores", 1))
--- End diff --

Sure, thanks for taking the time to clarify it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-28 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213192352
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
 ---
@@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite
 }
 extraJars.foreach(launcher.addJar)
 
+if (outFile.isDefined) {
--- End diff --

To me, either way is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-28 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213192210
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -62,14 +63,20 @@ private[spark] object PythonEvalType {
  */
 private[spark] abstract class BasePythonRunner[IN, OUT](
 funcs: Seq[ChainedPythonFunctions],
-bufferSize: Int,
-reuseWorker: Boolean,
 evalType: Int,
 argOffsets: Array[Array[Int]])
   extends Logging {
 
   require(funcs.length == argOffsets.length, "argOffsets should have the 
same length as funcs")
 
+  private val conf = SparkEnv.get.conf
+  private val bufferSize = conf.getInt("spark.buffer.size", 65536)
+  private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
+  // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
+  // number of concurrent tasks, which is determined by the number of 
cores in this executor.
+  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  .map(_ / conf.getInt("spark.executor.cores", 1))
--- End diff --

@rdblue, I fixed the site to refer databricks's guide. mind fixing this one 
if there are more changes to be pushed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-27 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213187429
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -91,6 +91,13 @@ private[spark] class Client(
   private val executorMemoryOverhead = 
sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
 math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, 
MEMORY_OVERHEAD_MIN)).toInt
 
+  private val isPython = sparkConf.get(IS_PYTHON_APP)
--- End diff --

Sure, one of them is https://github.com/sparklingpandas/sparklingml


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-27 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213186832
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
 ---
@@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite
 }
 extraJars.foreach(launcher.addJar)
 
+if (outFile.isDefined) {
--- End diff --

I think the pattern match would be better than the get.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213122284
  
--- Diff: docs/configuration.md ---
@@ -179,6 +179,15 @@ of the most common options to set are:
 (e.g. 2g, 8g).
   
 
+
+ spark.executor.pyspark.memory
+  Not set
+  
+The amount of memory to be allocated to PySpark in each executor, in 
MiB
+unless otherwise specified.  If set, PySpark memory for an executor 
will be
+limited to this amount. If not set, Spark will not limit Python's 
memory use.
--- End diff --

I've added "and it is up to the application to avoid exceeding the overhead 
memory space shared with other non-JVM processes."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213121178
  
--- Diff: docs/configuration.md ---
@@ -179,6 +179,15 @@ of the most common options to set are:
 (e.g. 2g, 8g).
   
 
+
+ spark.executor.pyspark.memory
+  Not set
+  
+The amount of memory to be allocated to PySpark in each executor, in 
MiB
--- End diff --

I've added "When PySpark is run in YARN, this memory is added to executor 
resource requests."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r213035238
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -91,6 +91,13 @@ private[spark] class Client(
   private val executorMemoryOverhead = 
sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
 math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, 
MEMORY_OVERHEAD_MIN)).toInt
 
+  private val isPython = sparkConf.get(IS_PYTHON_APP)
--- End diff --

@holdenk, can you point me to that repo? I'd love to have a look at how you 
do mixed pipelines.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-24 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r212782657
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
 ---
@@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite
 }
 extraJars.foreach(launcher.addJar)
 
+if (outFile.isDefined) {
--- End diff --

Like I said, I think `foreach` is a bad practice with options, so I'd 
rather not change to use it. I'd be happy to change this to a pattern match if 
you think it is really desirable to get rid of the `.get`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-24 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r212757824
  
--- Diff: docs/configuration.md ---
@@ -179,6 +179,15 @@ of the most common options to set are:
 (e.g. 2g, 8g).
   
 
+
+ spark.executor.pyspark.memory
+  Not set
+  
+The amount of memory to be allocated to PySpark in each executor, in 
MiB
--- End diff --

We should probably mention that this is added to the executor memory 
request in Yarn mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-24 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r212760057
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
 ---
@@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite
 }
 extraJars.foreach(launcher.addJar)
 
+if (outFile.isDefined) {
--- End diff --

If you do a foreach then the `.get` goes away and the code could be a 
little cleaner, but it's pretty minor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-24 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r212757958
  
--- Diff: docs/configuration.md ---
@@ -179,6 +179,15 @@ of the most common options to set are:
 (e.g. 2g, 8g).
   
 
+
+ spark.executor.pyspark.memory
+  Not set
+  
+The amount of memory to be allocated to PySpark in each executor, in 
MiB
+unless otherwise specified.  If set, PySpark memory for an executor 
will be
+limited to this amount. If not set, Spark will not limit Python's 
memory use.
--- End diff --

Maybe mention that in this case (unset) it's up to the user to keep Python 
+ system processes in the overhead %.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-24 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r212759411
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -91,6 +91,13 @@ private[spark] class Client(
   private val executorMemoryOverhead = 
sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
 math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, 
MEMORY_OVERHEAD_MIN)).toInt
 
+  private val isPython = sparkConf.get(IS_PYTHON_APP)
--- End diff --

Interesting, I'll add this to my example mixed pipeline repo so folks can 
see this hack.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...

2018-08-24 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r212714476
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -114,6 +114,10 @@ package object config {
 .checkValue(_ >= 0, "The off-heap memory size must not be negative")
 .createWithDefault(0)
 
+  private[spark] val PYSPARK_EXECUTOR_MEMORY = 
ConfigBuilder("spark.executor.pyspark.memory")
--- End diff --

Yes, it should. I'll fix it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org