This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7a8ff15 [SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters 7a8ff15 is described below commit 7a8ff15ff7c8cb73838f67a3ad2d3971961f9c4d Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Wed Feb 13 16:04:27 2019 -0800 [SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters ## What changes were proposed in this pull request? This PR aims to make `DataSourceV2Strategy` normalize filters like [FileSourceStrategy](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L150-L158) when it pushes them into `SupportsPushDownFilters.pushFilters`. ## How was this patch tested? Pass the Jenkins with the newly added test case. Closes #23770 from dongjoon-hyun/SPARK-26865. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/execution/datasources/DataSourceStrategy.scala | 16 ++++++++++++++++ .../sql/execution/datasources/FileSourceStrategy.scala | 10 +--------- .../execution/datasources/v2/DataSourceV2Strategy.scala | 7 +++++-- .../execution/datasources/DataSourceStrategySuite.scala | 7 +++++++ 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b5cf8c9..273cc3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -427,6 +427,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with object DataSourceStrategy { /** + * The attribute name of predicate could be different than the one in schema in case of + * case insensitive, we should change them to match the one in schema, so we do not need to + * worry about case sensitivity anymore. + */ + protected[sql] def normalizeFilters( + filters: Seq[Expression], + attributes: Seq[AttributeReference]): Seq[Expression] = { + filters.filterNot(SubqueryExpression.hasSubquery).map { e => + e transform { + case a: AttributeReference => + a.withName(attributes.find(_.semanticEquals(a)).get.name) + } + } + } + + /** * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. * * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 62ab5c8..970cbda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -147,15 +147,7 @@ object FileSourceStrategy extends Strategy with Logging { // - filters that need to be evaluated again after the scan val filterSet = ExpressionSet(filters) - // The attribute name of predicate could be different than the one in schema in case of - // case insensitive, we should change them to match the one in schema, so we do not need to - // worry about case sensitivity anymore. - val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e => - e transform { - case a: AttributeReference => - a.withName(l.output.find(_.semanticEquals(a)).get.name) - } - } + val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output) val partitionColumns = l.resolve( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 40ac5cf..d6d17d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} @@ -104,10 +104,13 @@ object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => val scanBuilder = relation.newScanBuilder() + + val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output) + // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters) + val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters) val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters) logInfo( s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index f20aded..2f5d555 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -219,6 +219,13 @@ class DataSourceStrategySuite extends PlanTest with SharedSQLContext { IsNotNull(attrInt))), None) } + test("SPARK-26865 DataSourceV2Strategy should push normalized filters") { + val attrInt = 'cint.int + assertResult(Seq(IsNotNull(attrInt))) { + DataSourceStrategy.normalizeFilters(Seq(IsNotNull(attrInt.withName("CiNt"))), Seq(attrInt)) + } + } + /** * Translate the given Catalyst [[Expression]] into data source [[sources.Filter]] * then verify against the given [[sources.Filter]]. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org