Harry Brundage created SPARK-6411:
-------------------------------------

             Summary: PySpark DataFrames can't be created if any datetimes have 
timezones
                 Key: SPARK-6411
                 URL: https://issues.apache.org/jira/browse/SPARK-6411
             Project: Spark
          Issue Type: Bug
          Components: PySpark, SQL
    Affects Versions: 1.3.0
            Reporter: Harry Brundage


I am unable to create a DataFrame with PySpark if any of the {{datetime}} 
objects that pass through the conversion process have a {{tzinfo}} property 
set. 

This works fine:

{code}
In [9]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 
10),)]).toDF().collect()
Out[9]: [Row(_1=datetime.datetime(2014, 7, 8, 11, 10))]
{code}

as expected, the tuple's schema is inferred as having one anonymous column with 
a datetime field, and the datetime roundtrips through to the Java side python 
deserialization and then back into python land upon {{collect}}. This however:

{code}
In [5]: from dateutil.tz import tzutc

In [10]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10, 
tzinfo=tzutc()),)]).toDF().collect()
{code}

explodes with

{code}
Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 
(TID 12, localhost): net.razorvine.pickle.PickleException: invalid pickle data 
for datetime; expected 1 or 7 args, got 2
        at 
net.razorvine.pickle.objects.DateTimeConstructor.createDateTime(DateTimeConstructor.java:69)
        at 
net.razorvine.pickle.objects.DateTimeConstructor.construct(DateTimeConstructor.java:32)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
        at 
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:154)
        at 
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:114)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:114)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:114)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

By the looks of the error, it would appear as though the java depickler isn't 
expecting the pickle stream to provide that extra timezone constructor argument.

Here's the disassembled pickle stream for a timezone-less datetime:

{code}
>>> object = datetime.datetime(2014, 7, 8, 11, 10)
>>> stream = pickle.dumps(object)
>>> pickletools.dis(stream)
    0: c    GLOBAL     'datetime datetime'
   19: p    PUT        0
   22: (    MARK
   23: S        STRING     '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
   65: p        PUT        1
   68: t        TUPLE      (MARK at 22)
   69: p    PUT        2
   72: R    REDUCE
   73: p    PUT        3
   76: .    STOP
highest protocol among opcodes = 0
{code}

and then for one with a timezone:

{code}
>>> object = datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc())
>>> stream = pickle.dumps(object)
>>> pickletools.dis(stream)
    0: c    GLOBAL     'datetime datetime'
   19: p    PUT        0
   22: (    MARK
   23: S        STRING     '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
   65: p        PUT        1
   68: c        GLOBAL     'copy_reg _reconstructor'
   93: p        PUT        2
   96: (        MARK
   97: c            GLOBAL     'dateutil.tz tzutc'
  116: p            PUT        3
  119: c            GLOBAL     'datetime tzinfo'
  136: p            PUT        4
  139: g            GET        4
  142: (            MARK
  143: t                TUPLE      (MARK at 142)
  144: R            REDUCE
  145: p            PUT        5
  148: t            TUPLE      (MARK at 96)
  149: p        PUT        6
  152: R        REDUCE
  153: p        PUT        7
  156: t        TUPLE      (MARK at 22)
  157: p    PUT        8
  160: R    REDUCE
  161: p    PUT        9
  164: .    STOP
highest protocol among opcodes = 0
{code}

I would bet that the Pyrolite library is missing support for that nested object 
as a second tuple member in the reconstruction of the datetime object. Has 
anyone hit this before? Any more information I can provide?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to