Harry Brundage created SPARK-6411:
-------------------------------------
Summary: PySpark DataFrames can't be created if any datetimes have
timezones
Key: SPARK-6411
URL: https://issues.apache.org/jira/browse/SPARK-6411
Project: Spark
Issue Type: Bug
Components: PySpark, SQL
Affects Versions: 1.3.0
Reporter: Harry Brundage
I am unable to create a DataFrame with PySpark if any of the {{datetime}}
objects that pass through the conversion process have a {{tzinfo}} property
set.
This works fine:
{code}
In [9]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11,
10),)]).toDF().collect()
Out[9]: [Row(_1=datetime.datetime(2014, 7, 8, 11, 10))]
{code}
as expected, the tuple's schema is inferred as having one anonymous column with
a datetime field, and the datetime roundtrips through to the Java side python
deserialization and then back into python land upon {{collect}}. This however:
{code}
In [5]: from dateutil.tz import tzutc
In [10]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10,
tzinfo=tzutc()),)]).toDF().collect()
{code}
explodes with
{code}
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0
(TID 12, localhost): net.razorvine.pickle.PickleException: invalid pickle data
for datetime; expected 1 or 7 args, got 2
at
net.razorvine.pickle.objects.DateTimeConstructor.createDateTime(DateTimeConstructor.java:69)
at
net.razorvine.pickle.objects.DateTimeConstructor.construct(DateTimeConstructor.java:32)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
at
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:154)
at
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:114)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:114)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:114)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}
By the looks of the error, it would appear as though the java depickler isn't
expecting the pickle stream to provide that extra timezone constructor argument.
Here's the disassembled pickle stream for a timezone-less datetime:
{code}
>>> object = datetime.datetime(2014, 7, 8, 11, 10)
>>> stream = pickle.dumps(object)
>>> pickletools.dis(stream)
0: c GLOBAL 'datetime datetime'
19: p PUT 0
22: ( MARK
23: S STRING '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
65: p PUT 1
68: t TUPLE (MARK at 22)
69: p PUT 2
72: R REDUCE
73: p PUT 3
76: . STOP
highest protocol among opcodes = 0
{code}
and then for one with a timezone:
{code}
>>> object = datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc())
>>> stream = pickle.dumps(object)
>>> pickletools.dis(stream)
0: c GLOBAL 'datetime datetime'
19: p PUT 0
22: ( MARK
23: S STRING '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
65: p PUT 1
68: c GLOBAL 'copy_reg _reconstructor'
93: p PUT 2
96: ( MARK
97: c GLOBAL 'dateutil.tz tzutc'
116: p PUT 3
119: c GLOBAL 'datetime tzinfo'
136: p PUT 4
139: g GET 4
142: ( MARK
143: t TUPLE (MARK at 142)
144: R REDUCE
145: p PUT 5
148: t TUPLE (MARK at 96)
149: p PUT 6
152: R REDUCE
153: p PUT 7
156: t TUPLE (MARK at 22)
157: p PUT 8
160: R REDUCE
161: p PUT 9
164: . STOP
highest protocol among opcodes = 0
{code}
I would bet that the Pyrolite library is missing support for that nested object
as a second tuple member in the reconstruction of the datetime object. Has
anyone hit this before? Any more information I can provide?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]