This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 07d07fe  [SPARK-27580][SQL] Implement `doCanonicalize` in 
BatchScanExec for comparing query plan results
07d07fe is described below

commit 07d07fec03df00d577c39f901dd9282a67d98b19
Author: Gengliang Wang <[email protected]>
AuthorDate: Mon Apr 29 17:54:12 2019 +0800

    [SPARK-27580][SQL] Implement `doCanonicalize` in BatchScanExec for 
comparing query plan results
    
    ## What changes were proposed in this pull request?
    
    The method `QueryPlan.sameResult` is used for comparing logical plans in 
order to:
    1. cache data in CacheManager
    2. uncache data in CacheManager
    3. Reuse subqueries
    4. etc...
    
    Currently the method `sameReuslt` always return false for `BatchScanExec`. 
We should fix it by implementing `doCanonicalize` for the node.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24475 from gengliangwang/sameResultForV2.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/datasources/v2/BatchScanExec.scala   |  5 ++
 .../sql/execution/datasources/v2/FileScan.scala    |  6 +++
 .../sql/execution/datasources/v2/orc/OrcScan.scala | 14 ++++-
 .../datasources/v2/orc/OrcScanBuilder.scala        |  2 +-
 .../spark/sql/execution/SameResultSuite.scala      | 61 ++++++++++++++++++++++
 .../datasources/FileSourceStrategySuite.scala      | 30 ++++++-----
 6 files changed, 104 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index c7fcc67..3276ab5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.sources.v2.reader._
 
 /**
@@ -46,4 +47,8 @@ case class BatchScanExec(
   override lazy val inputRDD: RDD[InternalRow] = {
     new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
   }
+
+  override def doCanonicalize(): BatchScanExec = {
+    this.copy(output = output.map(QueryPlan.normalizeExprId(_, output)))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index 337aac9..70d5932 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -26,6 +26,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.execution.PartitionedFileUtil
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan}
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -85,6 +86,11 @@ abstract class FileScan(
   override def readSchema(): StructType =
     StructType(readDataSchema.fields ++ readPartitionSchema.fields)
 
+  // Returns whether the two given arrays of [[Filter]]s are equivalent.
+  protected def equivalentFilters(a: Array[Filter], b: Array[Filter]): Boolean 
= {
+    a.sortBy(_.hashCode()).sameElements(b.sortBy(_.hashCode()))
+  }
+
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
 
   private def normalizeName(name: String): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index dc6b67c..b129c94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
 import org.apache.spark.sql.execution.datasources.v2.FileScan
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -34,7 +35,8 @@ case class OrcScan(
     dataSchema: StructType,
     readDataSchema: StructType,
     readPartitionSchema: StructType,
-    options: CaseInsensitiveStringMap)
+    options: CaseInsensitiveStringMap,
+    pushedFilters: Array[Filter])
   extends FileScan(sparkSession, fileIndex, readDataSchema, 
readPartitionSchema) {
   override def isSplitable(path: Path): Boolean = true
 
@@ -46,4 +48,14 @@ case class OrcScan(
     OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
       dataSchema, readDataSchema, readPartitionSchema)
   }
+
+  override def equals(obj: Any): Boolean = obj match {
+    case o: OrcScan =>
+      fileIndex == o.fileIndex && dataSchema == o.dataSchema &&
+      readDataSchema == o.readDataSchema && readPartitionSchema == 
o.readPartitionSchema &&
+      options == o.options && equivalentFilters(pushedFilters, o.pushedFilters)
+    case _ => false
+  }
+
+  override def hashCode(): Int = getClass.hashCode()
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index 4c1ec52..458b98c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -45,7 +45,7 @@ case class OrcScanBuilder(
 
   override def build(): Scan = {
     OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema,
-      readDataSchema(), readPartitionSchema(), options)
+      readDataSchema(), readPartitionSchema(), options, pushedFilters())
   }
 
   private var _pushedFilters: Array[Filter] = Array.empty
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
index d088e24..4731da4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution
 import org.apache.spark.sql.{DataFrame, QueryTest}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.IntegerType
 
@@ -47,6 +49,65 @@ class SameResultSuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
+  test("FileScan: different orders of data filters and partition filters") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
+      Seq("orc", "json", "csv").foreach { format =>
+        withTempPath { path =>
+          val tmpDir = path.getCanonicalPath
+          spark.range(10)
+            .selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d")
+            .write
+            .partitionBy("a", "b")
+            .format(format)
+            .option("header", true)
+            .save(tmpDir)
+          val df = spark.read.format(format).option("header", 
true).load(tmpDir)
+          // partition filters: a > 1 AND b < 9
+          // data filters: c > 1 AND d < 9
+          val plan1 = df.where("a > 1 AND b < 9 AND c > 1 AND d < 
9").queryExecution.sparkPlan
+          val plan2 = df.where("b < 9 AND a > 1 AND d < 9 AND c > 
1").queryExecution.sparkPlan
+          assert(plan1.sameResult(plan2))
+          val scan1 = getBatchScanExec(plan1)
+          val scan2 = getBatchScanExec(plan2)
+          assert(scan1.sameResult(scan2))
+          val plan3 = df.where("b < 9 AND a > 1 AND d < 8 AND c > 
1").queryExecution.sparkPlan
+          assert(!plan1.sameResult(plan3))
+          // The [[FileScan]]s should have different results if they support 
filter pushdown.
+          if (format == "orc") {
+            val scan3 = getBatchScanExec(plan3)
+            assert(!scan1.sameResult(scan3))
+          }
+        }
+      }
+    }
+  }
+
+  test("TextScan") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
+      withTempPath { path =>
+        val tmpDir = path.getCanonicalPath
+        spark.range(10)
+          .selectExpr("id as a", "id + 1 as b", "cast(id as string) value")
+          .write
+          .partitionBy("a", "b")
+          .text(tmpDir)
+        val df = spark.read.text(tmpDir)
+        // partition filters: a > 1 AND b < 9
+        // data filters: c > 1 AND d < 9
+        val plan1 = df.where("a > 1 AND b < 9 AND value == 
'3'").queryExecution.sparkPlan
+        val plan2 = df.where("value == '3' AND a > 1 AND b < 
9").queryExecution.sparkPlan
+        assert(plan1.sameResult(plan2))
+        val scan1 = getBatchScanExec(plan1)
+        val scan2 = getBatchScanExec(plan2)
+        assert(scan1.sameResult(scan2))
+      }
+    }
+  }
+
+  private def getBatchScanExec(plan: SparkPlan): BatchScanExec = {
+    plan.find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec]
+  }
+
   private def getFileSourceScanExec(df: DataFrame): FileSourceScanExec = {
     df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
       .asInstanceOf[FileSourceScanExec]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 955c3e3..b38f0f7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -414,19 +414,25 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
   }
 
   test("[SPARK-16818] partition pruned file scans implement sameResult 
correctly") {
-    withTempPath { path =>
-      val tempDir = path.getCanonicalPath
-      spark.range(100)
-        .selectExpr("id", "id as b")
-        .write
-        .partitionBy("id")
-        .parquet(tempDir)
-      val df = spark.read.parquet(tempDir)
-      def getPlan(df: DataFrame): SparkPlan = {
-        df.queryExecution.executedPlan
+    Seq("orc", "").foreach { useV1ReaderList =>
+      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1ReaderList) {
+        withTempPath { path =>
+          val tempDir = path.getCanonicalPath
+          spark.range(100)
+            .selectExpr("id", "id as b")
+            .write
+            .partitionBy("id")
+            .orc(tempDir)
+          val df = spark.read.orc(tempDir)
+
+          def getPlan(df: DataFrame): SparkPlan = {
+            df.queryExecution.executedPlan
+          }
+
+          assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 
2"))))
+          assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id 
= 3"))))
+        }
       }
-      assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 
2"))))
-      assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 
3"))))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to