Repository: spark
Updated Branches:
  refs/heads/branch-2.0 de56ea9bf -> 8ef31fbd7


[SPARK-15826][CORE] PipedRDD to allow configurable char encoding

## What changes were proposed in this pull request?

Link to jira which describes the problem: 
https://issues.apache.org/jira/browse/SPARK-15826

The fix in this PR is to allow users specify encoding in the pipe() operation. 
For backward compatibility,
keeping the default value to be system default.

## How was this patch tested?

Ran existing unit tests

Author: Tejas Patil <tej...@fb.com>

Closes #13563 from tejasapatil/pipedrdd_utf8.

(cherry picked from commit 279bd4aa5fddbabdb0383a3f6f0fc8d91780e092)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ef31fbd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ef31fbd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ef31fbd

Branch: refs/heads/branch-2.0
Commit: 8ef31fbd78bd4accd3b76ce1b5770a63625aa42f
Parents: de56ea9
Author: Tejas Patil <tej...@fb.com>
Authored: Wed Jun 15 12:03:00 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Jun 15 12:03:07 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 11 ++++++++++
 .../scala/org/apache/spark/rdd/PipedRDD.scala   | 22 +++++---------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 13 ++++++++----
 .../org/apache/spark/rdd/PipedRDDSuite.scala    | 12 ++++++++++-
 4 files changed, 36 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ef31fbd/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index e4ccd9f..a37c52c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -285,6 +285,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   }
 
   /**
+   * Return an RDD created by piping elements to a forked external process.
+   */
+  def pipe(command: JList[String],
+           env: JMap[String, String],
+           separateWorkingDir: Boolean,
+           bufferSize: Int,
+           encoding: String): JavaRDD[String] = {
+    rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, 
bufferSize, encoding)
+  }
+
+  /**
    * Zips this RDD with another one, returning key-value pairs with the first 
element in each RDD,
    * second element in each RDD, etc. Assumes that the two RDDs have the *same 
number of
    * partitions* and the *same number of elements in each partition* (e.g. one 
was made through

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef31fbd/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 49625b7..02b28b7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -47,22 +47,10 @@ private[spark] class PipedRDD[T: ClassTag](
     printPipeContext: (String => Unit) => Unit,
     printRDDElement: (T, String => Unit) => Unit,
     separateWorkingDir: Boolean,
-    bufferSize: Int)
+    bufferSize: Int,
+    encoding: String)
   extends RDD[String](prev) {
 
-  // Similar to Runtime.exec(), if we are given a single string, split it into 
words
-  // using a standard StringTokenizer (i.e. by spaces)
-  def this(
-      prev: RDD[T],
-      command: String,
-      envVars: Map[String, String] = Map(),
-      printPipeContext: (String => Unit) => Unit = null,
-      printRDDElement: (T, String => Unit) => Unit = null,
-      separateWorkingDir: Boolean = false) =
-    this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, 
printRDDElement,
-      separateWorkingDir, 8192)
-
-
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
   /**
@@ -129,7 +117,7 @@ private[spark] class PipedRDD[T: ClassTag](
       override def run(): Unit = {
         val err = proc.getErrorStream
         try {
-          for (line <- Source.fromInputStream(err).getLines) {
+          for (line <- Source.fromInputStream(err)(encoding).getLines) {
             // scalastyle:off println
             System.err.println(line)
             // scalastyle:on println
@@ -147,7 +135,7 @@ private[spark] class PipedRDD[T: ClassTag](
       override def run(): Unit = {
         TaskContext.setTaskContext(context)
         val out = new PrintWriter(new BufferedWriter(
-          new OutputStreamWriter(proc.getOutputStream), bufferSize))
+          new OutputStreamWriter(proc.getOutputStream, encoding), bufferSize))
         try {
           // scalastyle:off println
           // input the pipe context firstly
@@ -171,7 +159,7 @@ private[spark] class PipedRDD[T: ClassTag](
     }.start()
 
     // Return an iterator that read lines from the process's stdout
-    val lines = Source.fromInputStream(proc.getInputStream).getLines()
+    val lines = Source.fromInputStream(proc.getInputStream)(encoding).getLines
     new Iterator[String] {
       def next(): String = {
         if (!hasNext()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef31fbd/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e251421..b7a5b22 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -21,6 +21,7 @@ import java.util.Random
 
 import scala.collection.{mutable, Map}
 import scala.collection.mutable.ArrayBuffer
+import scala.io.Codec
 import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
 
@@ -698,14 +699,14 @@ abstract class RDD[T: ClassTag](
    * Return an RDD created by piping elements to a forked external process.
    */
   def pipe(command: String): RDD[String] = withScope {
-    new PipedRDD(this, command)
+    pipe(command)
   }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
   def pipe(command: String, env: Map[String, String]): RDD[String] = withScope 
{
-    new PipedRDD(this, command, env)
+    pipe(command, env)
   }
 
   /**
@@ -726,6 +727,8 @@ abstract class RDD[T: ClassTag](
    *                          for (e &lt;- record._2) {f(e)}
    * @param separateWorkingDir Use separate working directories for each task.
    * @param bufferSize Buffer size for the stdin writer for the piped process.
+   * @param encoding Char encoding used for interacting (via stdin, stdout and 
stderr) with
+   *                 the piped process
    * @return the result RDD
    */
   def pipe(
@@ -734,12 +737,14 @@ abstract class RDD[T: ClassTag](
       printPipeContext: (String => Unit) => Unit = null,
       printRDDElement: (T, String => Unit) => Unit = null,
       separateWorkingDir: Boolean = false,
-      bufferSize: Int = 8192): RDD[String] = withScope {
+      bufferSize: Int = 8192,
+      encoding: String = Codec.defaultCharsetCodec.name): RDD[String] = 
withScope {
     new PipedRDD(this, command, env,
       if (printPipeContext ne null) sc.clean(printPipeContext) else null,
       if (printRDDElement ne null) sc.clean(printRDDElement) else null,
       separateWorkingDir,
-      bufferSize)
+      bufferSize,
+      encoding)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef31fbd/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index fe2058d..27cfdc7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import java.io.File
 
 import scala.collection.Map
+import scala.io.Codec
 import scala.language.postfixOps
 import scala.sys.process._
 import scala.util.Try
@@ -207,7 +208,16 @@ class PipedRDDSuite extends SparkFunSuite with 
SharedSparkContext {
         }
       }
       val hadoopPart1 = generateFakeHadoopPartition()
-      val pipedRdd = new PipedRDD(nums, "printenv " + varName)
+      val pipedRdd =
+        new PipedRDD(
+          nums,
+          PipedRDD.tokenize("printenv " + varName),
+          Map(),
+          null,
+          null,
+          false,
+          4092,
+          Codec.defaultCharsetCodec.name)
       val tContext = TaskContext.empty()
       val rddIter = pipedRdd.compute(hadoopPart1, tContext)
       val arr = rddIter.toArray


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to