This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 34544d6  [SPARK-32609] Incorrect exchange reuse with DataSourceV2
34544d6 is described below

commit 34544d6c1d9cb53574d4d3f3b3993a7d42ebd81f
Author: mingjial <[email protected]>
AuthorDate: Mon Aug 17 15:23:35 2020 +0000

    [SPARK-32609] Incorrect exchange reuse with DataSourceV2
    
    ### What changes were proposed in this pull request?
    Compare pushedFilters in DataSourceV2ScanExec's equals function.
    
    ### Why are the changes needed?
    Scans with different filters were considered equal, thus causing incorrect 
exchange reuse. This change fix the issue.
    
    ### Does this PR introduce _any_ user-facing change?
    no.
    
    ### How was this patch tested?
    unit test coverage and ad hoc verification. Without my change, the unit 
test will fail.
    
    Closes #29430 from mingjialiu/branch-2.4.
    
    Authored-by: mingjial <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../datasources/v2/DataSourceV2ScanExec.scala      |  3 ++-
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 22 ++++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index c8494f9..9b70eec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -47,7 +47,8 @@ case class DataSourceV2ScanExec(
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
     case other: DataSourceV2ScanExec =>
-      output == other.output && reader.getClass == other.reader.getClass && 
options == other.options
+      (output == other.output && reader.getClass == other.reader.getClass
+        && options == other.options && pushedFilters == other.pushedFilters)
     case _ => false
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 2367bdd..ef0a8bd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -371,6 +371,28 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-32609: DataSourceV2 with different pushedfilters should be 
different") {
+    def getScanExec(query: DataFrame): DataSourceV2ScanExec = {
+      query.queryExecution.executedPlan.collect {
+        case d: DataSourceV2ScanExec => d
+      }.head
+    }
+
+    Seq(classOf[AdvancedDataSourceV2], 
classOf[JavaAdvancedDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        val df = spark.read.format(cls.getName).load()
+        val q1 = df.select('i).filter('i > 6)
+        val q2 = df.select('i).filter('i > 5)
+        val q3 = df.select('i).filter('i > 5)
+        val scan1 = getScanExec(q1)
+        val scan2 = getScanExec(q2)
+        val scan3 = getScanExec(q3)
+        assert(!scan1.equals(scan2))
+        assert(scan2.equals(scan3))
+      }
+    }
+  }
 }
 
 class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to