This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 34544d6 [SPARK-32609] Incorrect exchange reuse with DataSourceV2
34544d6 is described below
commit 34544d6c1d9cb53574d4d3f3b3993a7d42ebd81f
Author: mingjial <[email protected]>
AuthorDate: Mon Aug 17 15:23:35 2020 +0000
[SPARK-32609] Incorrect exchange reuse with DataSourceV2
### What changes were proposed in this pull request?
Compare pushedFilters in DataSourceV2ScanExec's equals function.
### Why are the changes needed?
Scans with different filters were considered equal, thus causing incorrect
exchange reuse. This change fix the issue.
### Does this PR introduce _any_ user-facing change?
no.
### How was this patch tested?
unit test coverage and ad hoc verification. Without my change, the unit
test will fail.
Closes #29430 from mingjialiu/branch-2.4.
Authored-by: mingjial <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../datasources/v2/DataSourceV2ScanExec.scala | 3 ++-
.../spark/sql/sources/v2/DataSourceV2Suite.scala | 22 ++++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index c8494f9..9b70eec 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -47,7 +47,8 @@ case class DataSourceV2ScanExec(
// TODO: unify the equal/hashCode implementation for all data source v2
query plans.
override def equals(other: Any): Boolean = other match {
case other: DataSourceV2ScanExec =>
- output == other.output && reader.getClass == other.reader.getClass &&
options == other.options
+ (output == other.output && reader.getClass == other.reader.getClass
+ && options == other.options && pushedFilters == other.pushedFilters)
case _ => false
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 2367bdd..ef0a8bd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -371,6 +371,28 @@ class DataSourceV2Suite extends QueryTest with
SharedSQLContext {
}
}
}
+
+ test("SPARK-32609: DataSourceV2 with different pushedfilters should be
different") {
+ def getScanExec(query: DataFrame): DataSourceV2ScanExec = {
+ query.queryExecution.executedPlan.collect {
+ case d: DataSourceV2ScanExec => d
+ }.head
+ }
+
+ Seq(classOf[AdvancedDataSourceV2],
classOf[JavaAdvancedDataSourceV2]).foreach { cls =>
+ withClue(cls.getName) {
+ val df = spark.read.format(cls.getName).load()
+ val q1 = df.select('i).filter('i > 6)
+ val q2 = df.select('i).filter('i > 5)
+ val q3 = df.select('i).filter('i > 5)
+ val scan1 = getScanExec(q1)
+ val scan2 = getScanExec(q2)
+ val scan3 = getScanExec(q3)
+ assert(!scan1.equals(scan2))
+ assert(scan2.equals(scan3))
+ }
+ }
+ }
}
class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]