Repository: spark
Updated Branches:
  refs/heads/master 570647267 -> a019e6efb


[SPARK-14542][CORE] PipeRDD should allow configurable buffer size for…

## What changes were proposed in this pull request?

Currently PipedRDD internally uses PrintWriter to write data to the stdin of 
the piped process, which by default uses a BufferedWriter of buffer size 8k. In 
our experiment, we have seen that 8k buffer size is too small and the job 
spends significant amount of CPU time in system calls to copy the data. We 
should have a way to configure the buffer size for the writer.

## How was this patch tested?
Ran PipedRDDSuite tests.

Author: Sital Kedia <[email protected]>

Closes #12309 from sitalkedia/bufferedPipedRDD.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a019e6ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a019e6ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a019e6ef

Branch: refs/heads/master
Commit: a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8
Parents: 5706472
Author: Sital Kedia <[email protected]>
Authored: Tue May 10 15:28:35 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Tue May 10 15:28:35 2016 +0100

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 58 +++++++++++++-------
 .../scala/org/apache/spark/rdd/PipedRDD.scala   | 10 +++-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  7 ++-
 .../org/apache/spark/rdd/PipedRDDSuite.scala    |  2 +-
 project/MimaExcludes.scala                      |  4 ++
 5 files changed, 54 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6f3b8fa..c17ca12 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
 
 import java.{lang => jl}
 import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.util.{Comparator, Iterator => JIterator, List => JList, Map => 
JMap}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -80,7 +80,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * This should ''not'' be called by users directly, but is available for 
implementors of custom
    * subclasses of RDD.
    */
-  def iterator(split: Partition, taskContext: TaskContext): 
java.util.Iterator[T] =
+  def iterator(split: Partition, taskContext: TaskContext): JIterator[T] =
     rdd.iterator(split, taskContext).asJava
 
   // Transformations (return a new RDD)
@@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * of the original partition.
    */
   def mapPartitionsWithIndex[R](
-      f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
+      f: JFunction2[jl.Integer, JIterator[T], JIterator[R]],
       preservesPartitioning: Boolean = false): JavaRDD[R] =
     new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, 
b.asJava).asScala,
         preservesPartitioning)(fakeClassTag))(fakeClassTag)
@@ -147,7 +147,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): 
JavaRDD[U] = {
+  def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
     }
@@ -157,7 +157,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
+  def mapPartitions[U](f: FlatMapFunction[JIterator[T], U],
       preservesPartitioning: Boolean): JavaRDD[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
@@ -169,7 +169,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): 
JavaDoubleRDD = {
+  def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): 
JavaDoubleRDD = {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
     }
@@ -179,7 +179,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitionsToPair[K2, V2](f: 
PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, 
V2]):
   JavaPairRDD[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
@@ -190,7 +190,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+  def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]],
       preservesPartitioning: Boolean): JavaDoubleRDD = {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
@@ -202,7 +202,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitionsToPair[K2, V2](f: 
PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2],
       preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Applies a function f to each partition of this RDD.
    */
-  def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
+  def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = {
     rdd.foreachPartition(x => f.call(x.asJava))
   }
 
@@ -256,19 +256,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
+  def pipe(command: String): JavaRDD[String] = {
+    rdd.pipe(command)
+  }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: JList[String]): JavaRDD[String] =
+  def pipe(command: JList[String]): JavaRDD[String] = {
     rdd.pipe(command.asScala)
+  }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: JList[String], env: java.util.Map[String, String]): 
JavaRDD[String] =
+  def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] 
= {
     rdd.pipe(command.asScala, env.asScala)
+  }
+
+  /**
+   * Return an RDD created by piping elements to a forked external process.
+   */
+  def pipe(command: JList[String],
+           env: JMap[String, String],
+           separateWorkingDir: Boolean,
+           bufferSize: Int): JavaRDD[String] = {
+    rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, 
bufferSize)
+  }
 
   /**
    * Zips this RDD with another one, returning key-value pairs with the first 
element in each RDD,
@@ -288,7 +302,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    */
   def zipPartitions[U, V](
       other: JavaRDDLike[U, _],
-      f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): 
JavaRDD[V] = {
+      f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
     def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
       (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
     }
@@ -446,8 +460,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * Return the count of each unique value in this RDD as a map of (value, 
count) pairs. The final
    * combine step happens locally on the master, equivalent to running a 
single reduce task.
    */
-  def countByValue(): java.util.Map[T, jl.Long] =
-    mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[java.util.Map[T, 
jl.Long]]
+  def countByValue(): JMap[T, jl.Long] =
+    mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]]
 
   /**
    * (Experimental) Approximate version of countByValue().
@@ -455,13 +469,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
   def countByValueApprox(
     timeout: Long,
     confidence: Double
-    ): PartialResult[java.util.Map[T, BoundedDouble]] =
+    ): PartialResult[JMap[T, BoundedDouble]] =
     rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap)
 
   /**
    * (Experimental) Approximate version of countByValue().
    */
-  def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, 
BoundedDouble]] =
+  def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] 
=
     rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap)
 
   /**
@@ -596,9 +610,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Returns the maximum element from this RDD as defined by the specified
    * Comparator[T].
+   *
    * @param comp the comparator that defines ordering
    * @return the maximum of the RDD
-   * */
+   */
   def max(comp: Comparator[T]): T = {
     rdd.max()(Ordering.comparatorToOrdering(comp))
   }
@@ -606,9 +621,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Returns the minimum element from this RDD as defined by the specified
    * Comparator[T].
+   *
    * @param comp the comparator that defines ordering
    * @return the minimum of the RDD
-   * */
+   */
   def min(comp: Comparator[T]): T = {
     rdd.min()(Ordering.comparatorToOrdering(comp))
   }
@@ -684,7 +700,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * The asynchronous version of the `foreachPartition` action, which
    * applies a function f to each partition of this RDD.
    */
-  def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): 
JavaFutureAction[Void] = {
+  def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): 
JavaFutureAction[Void] = {
     new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => 
f.call(x.asJava)),
       { x => null.asInstanceOf[Void] })
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index dd8e46b..4561685 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.rdd
 
+import java.io.BufferedWriter
 import java.io.File
 import java.io.FilenameFilter
 import java.io.IOException
+import java.io.OutputStreamWriter
 import java.io.PrintWriter
 import java.util.StringTokenizer
 import java.util.concurrent.atomic.AtomicReference
@@ -45,7 +47,8 @@ private[spark] class PipedRDD[T: ClassTag](
     envVars: Map[String, String],
     printPipeContext: (String => Unit) => Unit,
     printRDDElement: (T, String => Unit) => Unit,
-    separateWorkingDir: Boolean)
+    separateWorkingDir: Boolean,
+    bufferSize: Int)
   extends RDD[String](prev) {
 
   // Similar to Runtime.exec(), if we are given a single string, split it into 
words
@@ -58,7 +61,7 @@ private[spark] class PipedRDD[T: ClassTag](
       printRDDElement: (T, String => Unit) => Unit = null,
       separateWorkingDir: Boolean = false) =
     this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, 
printRDDElement,
-      separateWorkingDir)
+      separateWorkingDir, 8192)
 
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
@@ -144,7 +147,8 @@ private[spark] class PipedRDD[T: ClassTag](
     new Thread(s"stdin writer for $command") {
       override def run(): Unit = {
         TaskContext.setTaskContext(context)
-        val out = new PrintWriter(proc.getOutputStream)
+        val out = new PrintWriter(new BufferedWriter(
+          new OutputStreamWriter(proc.getOutputStream), bufferSize))
         try {
           // scalastyle:off println
           // input the pipe context firstly

http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 499a8b9..d85d0ff 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -724,6 +724,7 @@ abstract class RDD[T: ClassTag](
    *                        def printRDDElement(record:(String, Seq[String]), 
f:String=&gt;Unit) =
    *                          for (e &lt;- record._2) {f(e)}
    * @param separateWorkingDir Use separate working directories for each task.
+   * @param bufferSize Buffer size for the stdin writer for the piped process.
    * @return the result RDD
    */
   def pipe(
@@ -731,11 +732,13 @@ abstract class RDD[T: ClassTag](
       env: Map[String, String] = Map(),
       printPipeContext: (String => Unit) => Unit = null,
       printRDDElement: (T, String => Unit) => Unit = null,
-      separateWorkingDir: Boolean = false): RDD[String] = withScope {
+      separateWorkingDir: Boolean = false,
+      bufferSize: Int = 8192): RDD[String] = withScope {
     new PipedRDD(this, command, env,
       if (printPipeContext ne null) sc.clean(printPipeContext) else null,
       if (printRDDElement ne null) sc.clean(printRDDElement) else null,
-      separateWorkingDir)
+      separateWorkingDir,
+      bufferSize)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index e9cc819..fe2058d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -171,7 +171,7 @@ class PipedRDDSuite extends SparkFunSuite with 
SharedSparkContext {
       val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
       val collectPwd = pipedPwd.collect()
       assert(collectPwd(0).contains("tasks/"))
-      val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
+      val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize 
= 16384).collect()
       // make sure symlinks were created
       assert(pipedLs.length > 0)
       // clean up top level tasks directory

http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a5d57e1..b0d862d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -686,6 +686,10 @@ object MimaExcludes {
         ProblemFilters.exclude[IncompatibleMethTypeProblem](
           "org.apache.spark.sql.DataFrameReader.this")
       ) ++ Seq(
+        // SPARK-14542 configurable buffer size for pipe RDD
+        
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe"),
+        
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")
+      ) ++ Seq(
         // [SPARK-4452][Core]Shuffle data structures can starve others on the 
same thread for memory
         
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
       ) ++ Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to