[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524734#comment-15524734
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 1:39 AM:
------------------------------------------------------------------

Well in this case they are equal correct? We are using the same Dataframe with 
two different aggregation steps.

The Parquet example doesn't end up using the reusedExchange. It does the same 
plan as the parallelized one

{code}
PDF
== Physical Plan ==
Union
:- *HashAggregate(keys=[id#112], functions=[min(col1#113)])
:  +- Exchange hashpartitioning(id#112, 200)
:     +- *HashAggregate(keys=[id#112], functions=[partial_min(col1#113)])
:        +- *BatchedScan parquet [id#112,col1#113] Format: ParquetFormat, 
InputPaths: 
file:/Users/russellspitzer/repos/spark-cassandra-connector/ꟾ뫳㼎麡䰨틖㇗ཨᎪ贬, 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,col1:int>
+- *HashAggregate(keys=[id#112], functions=[min(col2#114)])
   +- Exchange hashpartitioning(id#112, 200)
      +- *HashAggregate(keys=[id#112], functions=[partial_min(col2#114)])
         +- *BatchedScan parquet [id#112,col2#114] Format: ParquetFormat, 
InputPaths: 
file:/Users/russellspitzer/repos/spark-cassandra-connector/ꟾ뫳㼎麡䰨틖㇗ཨᎪ贬, 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,col2:int>
{code}


was (Author: rspitzer):
Well in this case they are equal correct?

The Parquet example doesn't end up using the reusedExchange

> Reused Exchange Aggregations Produce Incorrect Results
> ------------------------------------------------------
>
>                 Key: SPARK-17673
>                 URL: https://issues.apache.org/jira/browse/SPARK-17673
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Russell Spitzer
>            Priority: Critical
>              Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
>     val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
>     frame.createCassandraTable(
>       keySpaceName,
>       table,
>       partitionKeyColumns = Some(Seq("id")))
>     frame
>       .write
>       .format("org.apache.spark.sql.cassandra")
>       .mode(SaveMode.Append)
>       .options(Map("table" -> table, "keyspace" -> keySpaceName))
>       .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
>     /* prints:
>       +---+---+
>       | id|min|
>       +---+---+
>       |  A|  1|
>       |  A|  1|
>       +---+---+
>      Should be 
>       | A| 1|
>       | A| 7|
>      */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> :     +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :        +- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>    +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
>     frame.write.parquet("garbagetest")
>     val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
>     println("PDF")
>     parquetmin1.union(parquetmin2).explain()
>     parquetmin1.union(parquetmin2).show()
>     /* prints:
>       +---+---+
>       | id|min|
>       +---+---+
>       |  A|  1|
>       |  A|  1|
>       +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to