Davies Liu created SPARK-4866:
---------------------------------

             Summary: Support StructType as key in MapType
                 Key: SPARK-4866
                 URL: https://issues.apache.org/jira/browse/SPARK-4866
             Project: Spark
          Issue Type: Bug
          Components: PySpark, SQL
            Reporter: Davies Liu


http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Applying-schema-to-a-dictionary-with-a-Tuple-as-key-td20716.html

Hi Guys, 

Im running a spark cluster in AWS with Spark 1.1.0 in EC2 

I am trying to convert a an RDD with tuple 

(u'string', int , {(int, int): int, (int, int): int}) 

to a schema rdd using the schema: 

{code}
fields = [StructField('field1',StringType(),True), 
                StructField('field2',IntegerType(),True), 
                
StructField('field3',MapType(StructType([StructField('field31',IntegerType(),True),
 
                        
StructField('field32',IntegerType(),True)]),IntegerType(),True),True) 
                ] 

schema = StructType(fields) 
# generate the schemaRDD with the defined schema 
schemaRDD = sqc.applySchema(RDD, schema) 
{code}

But when I add "field3" to the schema, it throws an execption: 

{code}
Traceback (most recent call last): 
  File "<stdin>", line 1, in <module>
  File "/root/spark/python/pyspark/rdd.py", line 1153, in take 
    res = self.context.runJob(self, takeUpToNumLeft, p, True) 
  File "/root/spark/python/pyspark/context.py", line 770, in runJob 
    it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
javaPartitions, allowLocal) 
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 
538, in __call__ 
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 
300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 28.0 failed 4 times, most recent failure: Lost task 0.3 in stage 28.0 
(TID 710, ip-172-31-29-120.ec2.internal): net.razorvine.pickle.PickleException: 
couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number 
of arguments 
        net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) 
        net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) 
        net.razorvine.pickle.Pickler.save(Pickler.java:125) 
        net.razorvine.pickle.Pickler.put_map(Pickler.java:321) 
        net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) 
        net.razorvine.pickle.Pickler.save(Pickler.java:125) 
        net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:412) 
        net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) 
        net.razorvine.pickle.Pickler.save(Pickler.java:125) 
        net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:412) 
        net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) 
        net.razorvine.pickle.Pickler.save(Pickler.java:125) 
        net.razorvine.pickle.Pickler.dump(Pickler.java:95) 
        net.razorvine.pickle.Pickler.dumps(Pickler.java:80) 
        
org.apache.spark.sql.SchemaRDD$$anonfun$javaToPython$1$$anonfun$apply$2.apply(SchemaRDD.scala:417)
 
        
org.apache.spark.sql.SchemaRDD$$anonfun$javaToPython$1$$anonfun$apply$2.apply(SchemaRDD.scala:417)
 
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
        
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:331)
 
        
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
 
        
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
 
        
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
 
        org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) 
        
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 
Driver stacktrace: 
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
 
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
 
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
 
        at scala.Option.foreach(Option.scala:236) 
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
 
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
 
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
        at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
        at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to