Davies Liu created SPARK-9627:
---------------------------------
Summary: SQL job failed if the dataframe is cached
Key: SPARK-9627
URL: https://issues.apache.org/jira/browse/SPARK-9627
Project: Spark
Issue Type: Bug
Affects Versions: 1.5.0
Reporter: Davies Liu
Priority: Critical
{code}
r = random.Random()
def gen(i):
d = date.today() - timedelta(r.randint(0, 5000))
cat = str(r.randint(0, 20)) * 5
c = r.randint(0, 1000)
price = decimal.Decimal(r.randint(0, 100000)) / 100
return (d, cat, c, price)
schema = StructType().add('date', DateType()).add('cat',
StringType()).add('count', ShortType()).add('price', DecimalType(5, 2))
#df = sqlContext.createDataFrame(sc.range(1<<24).map(gen), schema)
#df.show()
#df.write.parquet('sales4')
df = sqlContext.read.parquet('sales4')
df.cache()
df.count()
df.show()
print df.schema
raw_input()
r = df.groupBy(df.date, df.cat).agg(sum(df['count'] * df.price))
print r.explain(True)
r.show()
{code}
{code}
StructType(List(StructField(date,DateType,true),StructField(cat,StringType,true),StructField(count,ShortType,true),StructField(price,DecimalType(5,2),true)))
== Parsed Logical Plan ==
'Aggregate [date#0,cat#1], [date#0,cat#1,sum((count#2 * price#3)) AS sum((count
* price))#70]
Relation[date#0,cat#1,count#2,price#3]
org.apache.spark.sql.parquet.ParquetRelation@5ec8f315
== Analyzed Logical Plan ==
date: date, cat: string, sum((count * price)): decimal(21,2)
Aggregate [date#0,cat#1],
[date#0,cat#1,sum((change_decimal_precision(CAST(CAST(count#2,
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3,
DecimalType(11,2))))) AS sum((count * price))#70]
Relation[date#0,cat#1,count#2,price#3]
org.apache.spark.sql.parquet.ParquetRelation@5ec8f315
== Optimized Logical Plan ==
Aggregate [date#0,cat#1],
[date#0,cat#1,sum((change_decimal_precision(CAST(CAST(count#2,
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3,
DecimalType(11,2))))) AS sum((count * price))#70]
InMemoryRelation [date#0,cat#1,count#2,price#3], true, 10000,
StorageLevel(true, true, false, true, 1), (PhysicalRDD
[date#0,cat#1,count#2,price#3], MapPartitionsRDD[3] at), None
== Physical Plan ==
NewAggregate with SortBasedAggregationIterator List(date#0, cat#1)
ArrayBuffer((sum((change_decimal_precision(CAST(CAST(count#2,
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3,
DecimalType(11,2)))))2,mode=Final,isDistinct=false))
TungstenSort [date#0 ASC,cat#1 ASC], false, 0
ConvertToUnsafe
Exchange hashpartitioning(date#0,cat#1)
NewAggregate with SortBasedAggregationIterator List(date#0, cat#1)
ArrayBuffer((sum((change_decimal_precision(CAST(CAST(count#2,
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3,
DecimalType(11,2)))))2,mode=Partial,isDistinct=false))
TungstenSort [date#0 ASC,cat#1 ASC], false, 0
ConvertToUnsafe
InMemoryColumnarTableScan [date#0,cat#1,count#2,price#3],
(InMemoryRelation [date#0,cat#1,count#2,price#3], true, 10000,
StorageLevel(true, true, false, true, 1), (PhysicalRDD
[date#0,cat#1,count#2,price#3], MapPartitionsRDD[3] at), None)
Code Generation: true
== RDD ==
None
15/08/04 23:21:53 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times;
aborting job
Traceback (most recent call last):
File "t.py", line 34, in <module>
r.show()
File "/Users/davies/work/spark/python/pyspark/sql/dataframe.py", line 258, in
show
print(self._jdf.showString(n, truncate))
File "/Users/davies/work/spark/python/lib/py4j/java_gateway.py", line 538, in
__call__
self.target_id, self.name)
File "/Users/davies/work/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/Users/davies/work/spark/python/lib/py4j/protocol.py", line 300, in
get_return_value
format(target_id, '.', name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o36.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID
10, localhost): java.lang.UnsupportedOperationException: tail of empty list
at scala.collection.immutable.Nil$.tail(List.scala:339)
at scala.collection.immutable.Nil$.tail(List.scala:334)
at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)
at scala.reflect.internal.Symbols$Symbol.typeParams(Symbols.scala:1491)
at
scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:2144)
at
scala.reflect.internal.Types$TypeRef.initializedTypeParams(Types.scala:2408)
at
scala.reflect.internal.Types$TypeRef.typeParamsMatchArgs(Types.scala:2409)
at
scala.reflect.internal.Types$AliasTypeRef$class.dealias(Types.scala:2232)
at
scala.reflect.internal.Types$TypeRef$$anon$3.dealias(Types.scala:2539)
at
scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1256)
at
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202)
at
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65)
at
org.apache.spark.sql.columnar.compression.DictionaryEncoding$Decoder.<init>(compressionSchemes.scala:277)
at
org.apache.spark.sql.columnar.compression.DictionaryEncoding$.decoder(compressionSchemes.scala:185)
at
org.apache.spark.sql.columnar.compression.DictionaryEncoding$.decoder(compressionSchemes.scala:177)
at
org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.initialize(CompressibleColumnAccessor.scala:31)
at
org.apache.spark.sql.columnar.NativeColumnAccessor.initialize(ColumnAccessor.scala:64)
at
org.apache.spark.sql.columnar.ColumnAccessor$class.$init$(ColumnAccessor.scala:33)
at
org.apache.spark.sql.columnar.BasicColumnAccessor.<init>(ColumnAccessor.scala:44)
at
org.apache.spark.sql.columnar.NativeColumnAccessor.<init>(ColumnAccessor.scala:64)
at
org.apache.spark.sql.columnar.StringColumnAccessor.<init>(ColumnAccessor.scala:92)
at
org.apache.spark.sql.columnar.ColumnAccessor$.apply(ColumnAccessor.scala:130)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anonfun$15.apply(InMemoryColumnarTableScan.scala:300)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anonfun$15.apply(InMemoryColumnarTableScan.scala:299)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14.apply(InMemoryColumnarTableScan.scala:299)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14.apply(InMemoryColumnarTableScan.scala:297)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:173)
at
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
at
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]