[ 
https://issues.apache.org/jira/browse/SPARK-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14077136#comment-14077136
 ] 

Apache Spark commented on SPARK-1687:
-------------------------------------

User 'davies' has created a pull request for this issue:
https://github.com/apache/spark/pull/1623

> Support NamedTuples in RDDs
> ---------------------------
>
>                 Key: SPARK-1687
>                 URL: https://issues.apache.org/jira/browse/SPARK-1687
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 1.0.0
>         Environment: Spark version 1.0.0-SNAPSHOT
> Python 2.7.5
>            Reporter: Pat McDonough
>            Assignee: Davies Liu
>
> Add Support for NamedTuples in RDDs. Some sample code is below, followed by 
> the current error that comes from it.
> Based on a quick conversation with [~ahirreddy], 
> [Dill|https://github.com/uqfoundation/dill] might be a good solution here.
> {code}
> In [26]: from collections import namedtuple
> ...
> In [33]: Person = namedtuple('Person', 'id firstName lastName')
> In [34]: jon = Person(1, "Jon", "Doe")
> In [35]: jane = Person(2, "Jane", "Doe")
> In [36]: theDoes = sc.parallelize((jon, jane))
> In [37]: theDoes.collect()
> Out[37]: 
> [Person(id=1, firstName='Jon', lastName='Doe'),
>  Person(id=2, firstName='Jane', lastName='Doe')]
> In [38]: theDoes.count()
> PySpark worker failed with exception:
> PySpark worker failed with exception:
> Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> 14/04/30 14:43:53 ERROR Executor: Exception in task ID 23
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
>       at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
>       at 
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
>       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>       at org.apache.spark.scheduler.Task.run(Task.scala:51)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
>       at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
> 14/04/30 14:43:53 ERROR Executor: Exception in task ID 21
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
>       at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
>       at 
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
>       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>       at org.apache.spark.scheduler.Task.run(Task.scala:51)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
>       at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
> 14/04/30 14:43:53 ERROR TaskSetManager: Task 5.0:3 failed 1 times; aborting 
> job
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-38-8689b264fa46> in <module>()
> ----> 1 theDoes.count()
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in count(self)
>     706         3
>     707         """
> --> 708         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>     709 
>     710     def stats(self):
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in sum(self)
>     697         6.0
>     698         """
> --> 699         return self.mapPartitions(lambda x: 
> [sum(x)]).reduce(operator.add)
>     700 
>     701     def count(self):
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in reduce(self, f)
>     617             if acc is not None:
>     618                 yield acc
> --> 619         vals = self.mapPartitions(func).collect()
>     620         return reduce(f, vals)
>     621 
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in collect(self)
>     581         """
>     582         with _JavaStackTrace(self.context) as st:
> --> 583           bytesInJava = self._jrdd.collect().iterator()
>     584         return list(self._collect_iterator_through_file(bytesInJava))
>     585 
> /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py 
> in __call__(self, *args)
>     535         answer = self.gateway_client.send_command(command)
>     536         return_value = get_return_value(answer, self.gateway_client,
> --> 537                 self.target_id, self.name)
>     538 
>     539         for temp_arg in temp_args:
> /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o53.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
> 5.0:3 failed 1 times, most recent failure: Exception failure in TID 23 on 
> host localhost: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
>         
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
>         
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
>         org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>         
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
>         
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to