Jeffrey Chan created SPARK-9362:
-----------------------------------
Summary: Exception when using DataFrame groupby().sum on Decimal
type in Python
Key: SPARK-9362
URL: https://issues.apache.org/jira/browse/SPARK-9362
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 1.4.0
Reporter: Jeffrey Chan
Please see below. It works with integer, but not with decimal.
bash-4.1# pyspark
Python 2.6.6 (r266:84292, Jan 22 2014, 09:42:36)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Python version 2.6.6 (r266:84292, Jan 22 2014 09:42:36)
SparkContext available as sc, HiveContext available as sqlContext.
>>> from decimal import *
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> l1 = [('a', 1), ('b', 2), ('a', 3)]
>>> rdd1 = sc.parallelize(l1)
>>> df1 = sqlContext.createDataFrame(rdd1, ['key', 'value'])
>>> df1.collect()
[Row(key=u'a', value=1), Row(key=u'b', value=2), Row(key=u'a', value=3)]
>>> df1.groupBy("key").sum("value").collect()
[Row(key=u'a', SUM(value)=4), Row(key=u'b', SUM(value)=2)]
>>> l2 = [('a', Decimal('1.1')), ('b', Decimal('2.3')), ('a', Decimal('3.4'))]
>>> rdd2 = sc.parallelize(l2)
>>> df2 = sqlContext.createDataFrame(rdd2, ['key', 'value'])
>>> df2.collect()
[Row(key=u'a', value=Decimal('1.1')), Row(key=u'b', value=Decimal('2.3')),
Row(key=u'a', value=Decimal('3.4'))]
>>> df2.groupBy("key").sum("value").collect()
15/07/26 14:58:50 ERROR Executor: Exception in task 1.0 in stage 12.0 (TID 215)
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to
org.apache.spark.sql.types.Decimal
at
org.apache.spark.sql.types.Decimal$DecimalIsFractional$.plus(Decimal.scala:330)
at
org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:125)
at
org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:51)
at
org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:91)
at
org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:625)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$7.apply(Aggregate.scala:165)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$7.apply(Aggregate.scala:149)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
15/07/26 14:58:50 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 214)
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to
org.apache.spark.sql.types.Decimal
at
org.apache.spark.sql.types.Decimal$DecimalIsFractional$.plus(Decimal.scala:330)
at
org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:125)
at
org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:51)
at
org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:91)
at
org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:625)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$7.apply(Aggregate.scala:165)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$7.apply(Aggregate.scala:149)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
15/07/26 14:58:50 ERROR TaskSetManager: Task 1 in stage 12.0 failed 1 times;
aborting job
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 314, in collect
port =
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in
stage 12.0 failed 1 times, most recent failure: Lost task 1.0 in stage 12.0
(TID 215, localhost): java.lang.ClassCastException: java.math.BigDecimal cannot
be cast to org.apache.spark.sql.types.Decimal
at
org.apache.spark.sql.types.Decimal$DecimalIsFractional$.plus(Decimal.scala:330)
at
org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:125)
at
org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:51)
at
org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:91)
at
org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:625)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$7.apply(Aggregate.scala:165)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$7.apply(Aggregate.scala:149)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]