This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a8ff15  [SPARK-26865][SQL] DataSourceV2Strategy should push 
normalized filters
7a8ff15 is described below

commit 7a8ff15ff7c8cb73838f67a3ad2d3971961f9c4d
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Wed Feb 13 16:04:27 2019 -0800

    [SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters
    
    ## What changes were proposed in this pull request?
    
    This PR aims to make `DataSourceV2Strategy` normalize filters like 
[FileSourceStrategy](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L150-L158)
 when it pushes them into `SupportsPushDownFilters.pushFilters`.
    
    ## How was this patch tested?
    
    Pass the Jenkins with the newly added test case.
    
    Closes #23770 from dongjoon-hyun/SPARK-26865.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/execution/datasources/DataSourceStrategy.scala   | 16 ++++++++++++++++
 .../sql/execution/datasources/FileSourceStrategy.scala   | 10 +---------
 .../execution/datasources/v2/DataSourceV2Strategy.scala  |  7 +++++--
 .../execution/datasources/DataSourceStrategySuite.scala  |  7 +++++++
 4 files changed, 29 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index b5cf8c9..273cc3b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -427,6 +427,22 @@ case class DataSourceStrategy(conf: SQLConf) extends 
Strategy with Logging with
 
 object DataSourceStrategy {
   /**
+   * The attribute name of predicate could be different than the one in schema 
in case of
+   * case insensitive, we should change them to match the one in schema, so we 
do not need to
+   * worry about case sensitivity anymore.
+   */
+  protected[sql] def normalizeFilters(
+      filters: Seq[Expression],
+      attributes: Seq[AttributeReference]): Seq[Expression] = {
+    filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
+      e transform {
+        case a: AttributeReference =>
+          a.withName(attributes.find(_.semanticEquals(a)).get.name)
+      }
+    }
+  }
+
+  /**
    * Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
    *
    * @return a `Some[Filter]` if the input [[Expression]] is convertible, 
otherwise a `None`.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 62ab5c8..970cbda 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -147,15 +147,7 @@ object FileSourceStrategy extends Strategy with Logging {
       //  - filters that need to be evaluated again after the scan
       val filterSet = ExpressionSet(filters)
 
-      // The attribute name of predicate could be different than the one in 
schema in case of
-      // case insensitive, we should change them to match the one in schema, 
so we do not need to
-      // worry about case sensitivity anymore.
-      val normalizedFilters = 
filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
-        e transform {
-          case a: AttributeReference =>
-            a.withName(l.output.find(_.semanticEquals(a)).get.name)
-        }
-      }
+      val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, 
l.output)
 
       val partitionColumns =
         l.resolve(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 40ac5cf..d6d17d6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import scala.collection.mutable
 
 import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
Repartition}
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
@@ -104,10 +104,13 @@ object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
       val scanBuilder = relation.newScanBuilder()
+
+      val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, 
relation.output)
+
       // `pushedFilters` will be pushed down and evaluated in the underlying 
data sources.
       // `postScanFilters` need to be evaluated after the scan.
       // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet 
row group filter.
-      val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters)
+      val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, 
normalizedFilters)
       val (scan, output) = pruneColumns(scanBuilder, relation, project ++ 
postScanFilters)
       logInfo(
         s"""
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
index f20aded..2f5d555 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
@@ -219,6 +219,13 @@ class DataSourceStrategySuite extends PlanTest with 
SharedSQLContext {
         IsNotNull(attrInt))), None)
   }
 
+  test("SPARK-26865 DataSourceV2Strategy should push normalized filters") {
+    val attrInt = 'cint.int
+    assertResult(Seq(IsNotNull(attrInt))) {
+      
DataSourceStrategy.normalizeFilters(Seq(IsNotNull(attrInt.withName("CiNt"))), 
Seq(attrInt))
+    }
+  }
+
   /**
    * Translate the given Catalyst [[Expression]] into data source 
[[sources.Filter]]
    * then verify against the given [[sources.Filter]].


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to