Repository: spark
Updated Branches:
  refs/heads/branch-1.6 bb01cbe9b -> ddb963304


[SPARK-12231][SQL] create a combineFilters' projection when we call 
buildPartitionedTableScan

Hello Michael & All:

We have some issues to submit the new codes in the other PR(#10299), so we 
closed that PR and open this one with the fix.

The reason for the previous failure is that the projection for the scan when 
there is a filter that is not pushed down (the "left-over" filter) could be 
different, in elements or ordering, from the original projection.

With this new codes, the approach to solve this problem is:

Insert a new Project if the "left-over" filter is nonempty and (the original 
projection is not empty and the projection for the scan has more than one 
elements which could otherwise cause different ordering in projection).

We create 3 test cases to cover the otherwise failure cases.

Author: Kevin Yu <q...@us.ibm.com>

Closes #10388 from kevinyu98/spark-12231.

(cherry picked from commit fd50df413fbb3b7528cdff311cc040a6212340b9)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ddb96330
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ddb96330
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ddb96330

Branch: refs/heads/branch-1.6
Commit: ddb9633043e82fb2a34c7e0e29b487f635c3c744
Parents: bb01cbe
Author: Kevin Yu <q...@us.ibm.com>
Authored: Mon Dec 28 11:58:33 2015 -0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Sun Jan 31 16:16:34 2016 -0800

----------------------------------------------------------------------
 .../datasources/DataSourceStrategy.scala        | 28 ++++++++++---
 .../parquet/ParquetFilterSuite.scala            | 41 ++++++++++++++++++++
 2 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ddb96330/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8a15a51..3741a9c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -77,7 +77,8 @@ private[sql] object DataSourceStrategy extends Strategy with 
Logging {
       val pushedFilters = 
filters.filter(_.references.intersect(partitionColumns).isEmpty)
 
       // Predicates with both partition keys and attributes
-      val combineFilters = filters.toSet -- partitionFilters.toSet -- 
pushedFilters.toSet
+      val partitionAndNormalColumnFilters =
+        filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
 
       val selectedPartitions = prunePartitions(partitionFilters, 
t.partitionSpec).toArray
 
@@ -88,16 +89,33 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         s"Selected $selected partitions out of $total, pruned $percentPruned% 
partitions."
       }
 
+      // need to add projections from "partitionAndNormalColumnAttrs" in if it 
is not empty
+      val partitionAndNormalColumnAttrs = 
AttributeSet(partitionAndNormalColumnFilters)
+      val partitionAndNormalColumnProjs = if 
(partitionAndNormalColumnAttrs.isEmpty) {
+        projects
+      } else {
+        (partitionAndNormalColumnAttrs ++ projects).toSeq
+      }
+
       val scan = buildPartitionedTableScan(
         l,
-        projects,
+        partitionAndNormalColumnProjs,
         pushedFilters,
         t.partitionSpec.partitionColumns,
         selectedPartitions)
 
-      combineFilters
-        .reduceLeftOption(expressions.And)
-        .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
+      // Add a Projection to guarantee the original projection:
+      // this is because "partitionAndNormalColumnAttrs" may be different
+      // from the original "projects", in elements or their ordering
+
+      partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf 
=>
+        if (projects.isEmpty || projects == partitionAndNormalColumnProjs) {
+          // if the original projection is empty, no need for the additional 
Project either
+          execution.Filter(cf, scan)
+        } else {
+          execution.Project(projects, execution.Filter(cf, scan))
+        }
+      ).getOrElse(scan) :: Nil
 
     // Scanning non-partitioned HadoopFsRelation
     case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation, _)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ddb96330/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index ed5a352..1b52859 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -317,6 +317,47 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
     }
   }
 
+  test("SPARK-12231: test the filter and empty project in partitioned 
DataSource scan") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}"
+        (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").
+          write.partitionBy("a").parquet(path)
+
+        // The filter "a > 1 or b < 2" will not get pushed down, and the 
projection is empty,
+        // this query will throw an exception since the project from 
combinedFilter expect
+        // two projection while the
+        val df1 = sqlContext.read.parquet(dir.getCanonicalPath)
+
+        assert(df1.filter("a > 1 or b < 2").count() == 2)
+      }
+    }
+  }
+
+  test("SPARK-12231: test the new projection in partitioned DataSource scan") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}"
+        (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").
+          write.partitionBy("a").parquet(path)
+
+        // test the generate new projection case
+        // when projects != partitionAndNormalColumnProjs
+
+        val df1 = sqlContext.read.parquet(dir.getCanonicalPath)
+
+        checkAnswer(
+          df1.filter("a > 1 or b > 2").orderBy("a").selectExpr("a", "b", "c", 
"d"),
+          (2 to 3).map(i => Row(i, i + 1, i + 2, i + 3)))
+      }
+    }
+  }
+
+
   test("SPARK-11103: Filter applied on merged Parquet schema with new column 
fails") {
     import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to