Repository: spark
Updated Branches:
  refs/heads/branch-1.0 2a669a70d -> ac86af8ac


[SPARK-1690] Tolerating empty elements when saving Python RDD to text files

Tolerate empty strings in PythonRDD

Author: Kan Zhang <[email protected]>

Closes #644 from kanzhang/SPARK-1690 and squashes the following commits:

c62ad33 [Kan Zhang] Adding Python doctest
473ec4b [Kan Zhang] [SPARK-1690] Tolerating empty elements when saving Python 
RDD to text files
(cherry picked from commit 6c2691d0a0ed46a8b8093e05a4708706cf187168)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac86af8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac86af8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac86af8a

Branch: refs/heads/branch-1.0
Commit: ac86af8ac7606a373676e9c41641fa7629453134
Parents: 2a669a7
Author: Kan Zhang <[email protected]>
Authored: Sat May 10 14:01:08 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Sat May 10 14:01:25 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala   | 5 +++--
 python/pyspark/rdd.py                                        | 8 ++++++++
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac86af8a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 388b838..2971c27 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag](
               val obj = new Array[Byte](length)
               stream.readFully(obj)
               obj
+            case 0 => Array.empty[Byte]
             case SpecialLengths.TIMING_DATA =>
               // Timing data from worker
               val bootTime = stream.readLong()
@@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag](
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
               }
-              Array.empty[Byte]
+              null
           }
         } catch {
 
@@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
       var _nextObj = read()
 
-      def hasNext = _nextObj.length != 0
+      def hasNext = _nextObj != null
     }
     new InterruptibleIterator(context, stdoutIterator)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac86af8a/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 3a1c56a..4f74824 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -891,6 +891,14 @@ class RDD(object):
         >>> from glob import glob
         >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
         '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
+
+        Empty lines are tolerated when saving to text files.
+
+        >>> tempFile2 = NamedTemporaryFile(delete=True)
+        >>> tempFile2.close()
+        >>> sc.parallelize(['', 'foo', '', 'bar', 
'']).saveAsTextFile(tempFile2.name)
+        >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
+        '\\n\\n\\nbar\\nfoo\\n'
         """
         def func(split, iterator):
             for x in iterator:

Reply via email to