Davies Liu created SPARK-9627:
---------------------------------

             Summary: SQL job failed if the dataframe is cached
                 Key: SPARK-9627
                 URL: https://issues.apache.org/jira/browse/SPARK-9627
             Project: Spark
          Issue Type: Bug
    Affects Versions: 1.5.0
            Reporter: Davies Liu
            Priority: Critical


{code}
r = random.Random()
def gen(i):
    d = date.today() - timedelta(r.randint(0, 5000))
    cat = str(r.randint(0, 20)) * 5
    c = r.randint(0, 1000)
    price = decimal.Decimal(r.randint(0, 100000)) / 100
    return (d, cat, c, price)

schema = StructType().add('date', DateType()).add('cat', 
StringType()).add('count', ShortType()).add('price', DecimalType(5, 2))

#df = sqlContext.createDataFrame(sc.range(1<<24).map(gen), schema)
#df.show()
#df.write.parquet('sales4')


df = sqlContext.read.parquet('sales4')
df.cache()
df.count()
df.show()
print df.schema
raw_input()
r = df.groupBy(df.date, df.cat).agg(sum(df['count'] * df.price))
print r.explain(True)
r.show()
{code}

{code}
StructType(List(StructField(date,DateType,true),StructField(cat,StringType,true),StructField(count,ShortType,true),StructField(price,DecimalType(5,2),true)))


== Parsed Logical Plan ==
'Aggregate [date#0,cat#1], [date#0,cat#1,sum((count#2 * price#3)) AS sum((count 
* price))#70]
 Relation[date#0,cat#1,count#2,price#3] 
org.apache.spark.sql.parquet.ParquetRelation@5ec8f315

== Analyzed Logical Plan ==
date: date, cat: string, sum((count * price)): decimal(21,2)
Aggregate [date#0,cat#1], 
[date#0,cat#1,sum((change_decimal_precision(CAST(CAST(count#2, 
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3, 
DecimalType(11,2))))) AS sum((count * price))#70]
 Relation[date#0,cat#1,count#2,price#3] 
org.apache.spark.sql.parquet.ParquetRelation@5ec8f315

== Optimized Logical Plan ==
Aggregate [date#0,cat#1], 
[date#0,cat#1,sum((change_decimal_precision(CAST(CAST(count#2, 
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3, 
DecimalType(11,2))))) AS sum((count * price))#70]
 InMemoryRelation [date#0,cat#1,count#2,price#3], true, 10000, 
StorageLevel(true, true, false, true, 1), (PhysicalRDD 
[date#0,cat#1,count#2,price#3], MapPartitionsRDD[3] at), None

== Physical Plan ==
NewAggregate with SortBasedAggregationIterator List(date#0, cat#1) 
ArrayBuffer((sum((change_decimal_precision(CAST(CAST(count#2, 
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3, 
DecimalType(11,2)))))2,mode=Final,isDistinct=false))
 TungstenSort [date#0 ASC,cat#1 ASC], false, 0
  ConvertToUnsafe
   Exchange hashpartitioning(date#0,cat#1)
    NewAggregate with SortBasedAggregationIterator List(date#0, cat#1) 
ArrayBuffer((sum((change_decimal_precision(CAST(CAST(count#2, 
DecimalType(5,0)), DecimalType(11,2))) * change_decimal_precision(CAST(price#3, 
DecimalType(11,2)))))2,mode=Partial,isDistinct=false))
     TungstenSort [date#0 ASC,cat#1 ASC], false, 0
      ConvertToUnsafe
       InMemoryColumnarTableScan [date#0,cat#1,count#2,price#3], 
(InMemoryRelation [date#0,cat#1,count#2,price#3], true, 10000, 
StorageLevel(true, true, false, true, 1), (PhysicalRDD 
[date#0,cat#1,count#2,price#3], MapPartitionsRDD[3] at), None)

Code Generation: true
== RDD ==
None

15/08/04 23:21:53 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; 
aborting job
Traceback (most recent call last):
  File "t.py", line 34, in <module>
    r.show()
  File "/Users/davies/work/spark/python/pyspark/sql/dataframe.py", line 258, in 
show
    print(self._jdf.showString(n, truncate))
  File "/Users/davies/work/spark/python/lib/py4j/java_gateway.py", line 538, in 
__call__
    self.target_id, self.name)
  File "/Users/davies/work/spark/python/pyspark/sql/utils.py", line 36, in deco
    return f(*a, **kw)
  File "/Users/davies/work/spark/python/lib/py4j/protocol.py", line 300, in 
get_return_value
    format(target_id, '.', name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o36.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 
10, localhost): java.lang.UnsupportedOperationException: tail of empty list
        at scala.collection.immutable.Nil$.tail(List.scala:339)
        at scala.collection.immutable.Nil$.tail(List.scala:334)
        at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)
        at scala.reflect.internal.Symbols$Symbol.typeParams(Symbols.scala:1491)
        at 
scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:2144)
        at 
scala.reflect.internal.Types$TypeRef.initializedTypeParams(Types.scala:2408)
        at 
scala.reflect.internal.Types$TypeRef.typeParamsMatchArgs(Types.scala:2409)
        at 
scala.reflect.internal.Types$AliasTypeRef$class.dealias(Types.scala:2232)
        at 
scala.reflect.internal.Types$TypeRef$$anon$3.dealias(Types.scala:2539)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1256)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65)
        at 
org.apache.spark.sql.columnar.compression.DictionaryEncoding$Decoder.<init>(compressionSchemes.scala:277)
        at 
org.apache.spark.sql.columnar.compression.DictionaryEncoding$.decoder(compressionSchemes.scala:185)
        at 
org.apache.spark.sql.columnar.compression.DictionaryEncoding$.decoder(compressionSchemes.scala:177)
        at 
org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.initialize(CompressibleColumnAccessor.scala:31)
        at 
org.apache.spark.sql.columnar.NativeColumnAccessor.initialize(ColumnAccessor.scala:64)
        at 
org.apache.spark.sql.columnar.ColumnAccessor$class.$init$(ColumnAccessor.scala:33)
        at 
org.apache.spark.sql.columnar.BasicColumnAccessor.<init>(ColumnAccessor.scala:44)
        at 
org.apache.spark.sql.columnar.NativeColumnAccessor.<init>(ColumnAccessor.scala:64)
        at 
org.apache.spark.sql.columnar.StringColumnAccessor.<init>(ColumnAccessor.scala:92)
        at 
org.apache.spark.sql.columnar.ColumnAccessor$.apply(ColumnAccessor.scala:130)
        at 
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anonfun$15.apply(InMemoryColumnarTableScan.scala:300)
        at 
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anonfun$15.apply(InMemoryColumnarTableScan.scala:299)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14.apply(InMemoryColumnarTableScan.scala:299)
        at 
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14.apply(InMemoryColumnarTableScan.scala:297)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:173)
        at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
        at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to