Repository: spark
Updated Branches:
  refs/heads/branch-1.0 5f48721ec -> 919ed3108


[SPARK-1674] fix interrupted system call error in pyspark's RDD.pipe

`RDD.pipe`'s doctest throws interrupted system call exception on Mac. It can be 
fixed by wrapping `pipe.stdout.readline` in an iterator.

Author: Xiangrui Meng <[email protected]>

Closes #594 from mengxr/pyspark-pipe and squashes the following commits:

cc32ac9 [Xiangrui Meng] fix interrupted system call error in pyspark's RDD.pipe

(cherry picked from commit d33df1c151f8e982edd7324edc06d8cd3024dd34)
Signed-off-by: Matei Zaharia <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/919ed310
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/919ed310
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/919ed310

Branch: refs/heads/branch-1.0
Commit: 919ed3108966d6f2c5bd558efe458bd37173433c
Parents: 5f48721
Author: Xiangrui Meng <[email protected]>
Authored: Tue Apr 29 18:06:45 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Tue Apr 29 18:06:53 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/919ed310/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index a59778c..3a1c56a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -537,8 +537,8 @@ class RDD(object):
         """
         Return an RDD created by piping elements to a forked external process.
 
-        >>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
-        ['1', '2', '3']
+        >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
+        ['1', '2', '', '3']
         """
         def func(iterator):
             pipe = Popen(shlex.split(command), env=env, stdin=PIPE, 
stdout=PIPE)
@@ -547,7 +547,7 @@ class RDD(object):
                     out.write(str(obj).rstrip('\n') + '\n')
                 out.close()
             Thread(target=pipe_objs, args=[pipe.stdin]).start()
-            return (x.rstrip('\n') for x in pipe.stdout)
+            return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
         return self.mapPartitions(func)
 
     def foreach(self, f):

Reply via email to