Wenbo Zhao created SPARK-24373:
----------------------------------

             Summary: Spark Dataset groupby.agg/count doesn't respect cache
                 Key: SPARK-24373
                 URL: https://issues.apache.org/jira/browse/SPARK-24373
             Project: Spark
          Issue Type: Bug
          Components: Input/Output
    Affects Versions: 2.3.0
            Reporter: Wenbo Zhao


Here is the code to reproduce in local mode
{code:java}
val df = sc.range(1, 2).toDF
val myudf = udf({x: Long => println("xxxx"); x + 1})
scala> df1.cache()
res0: df1.type = [value: bigint, value1: bigint] 
scala> df1.count
res1: Long = 1
scala> df1.count
res2: Long = 1 
scala> df1.count 
res3: Long = 1
{code}
 

in Spark 2.2, you could see it prints "xxxx". 

In the above example, when you do explain. You could see
{code:java}
scala> df1.explain(true)
== Parsed Logical Plan ==
'Project [value#2L, UDF('value) AS value1#5]
+- AnalysisBarrier
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
value: bigint, value1: bigint
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

== Physical Plan ==
*(1) InMemoryTableScan [value#2L, value1#5L]
+- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

{code}
but the ImMemoryTableScan is mising in the following explain()
{code:java}
scala> df1.groupBy().count().explain(true)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#175L])
+- *(1) Project
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to