Repository: spark
Updated Branches:
  refs/heads/master 8a7872dc2 -> 6df234579


[SPARK-25699][SQL] Partially push down conjunctive predicated in ORC

## What changes were proposed in this pull request?

Inspired by https://github.com/apache/spark/pull/22574 .
We can partially push down top level conjunctive predicates to Orc.
This PR improves Orc predicate push down in both SQL and Hive module.

## How was this patch tested?

New unit test.

Closes #22684 from gengliangwang/pushOrcFilters.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: DB Tsai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6df23457
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6df23457
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6df23457

Branch: refs/heads/master
Commit: 6df2345794614c33c95fa453cabac755cf94d131
Parents: 8a7872d
Author: Gengliang Wang <[email protected]>
Authored: Wed Oct 10 18:18:56 2018 +0000
Committer: DB Tsai <[email protected]>
Committed: Wed Oct 10 18:18:56 2018 +0000

----------------------------------------------------------------------
 .../execution/datasources/orc/OrcFilters.scala  | 69 +++++++++++++++-----
 .../datasources/orc/OrcFilterSuite.scala        | 37 ++++++++++-
 .../apache/spark/sql/hive/orc/OrcFilters.scala  | 69 +++++++++++++++-----
 .../spark/sql/hive/orc/HiveOrcFilterSuite.scala | 45 ++++++++++++-
 4 files changed, 186 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6df23457/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index dbafc46..2b17b47 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -138,6 +138,23 @@ private[sql] object OrcFilters {
       dataTypeMap: Map[String, DataType],
       expression: Filter,
       builder: Builder): Option[Builder] = {
+    createBuilder(dataTypeMap, expression, builder, 
canPartialPushDownConjuncts = true)
+  }
+
+  /**
+   * @param dataTypeMap a map from the attribute name to its data type.
+   * @param expression the input filter predicates.
+   * @param builder the input SearchArgument.Builder.
+   * @param canPartialPushDownConjuncts whether a subset of conjuncts of 
predicates can be pushed
+   *                                    down safely. Pushing ONLY one side of 
AND down is safe to
+   *                                    do at the top level or none of its 
ancestors is NOT and OR.
+   * @return the builder so far.
+   */
+  private def createBuilder(
+      dataTypeMap: Map[String, DataType],
+      expression: Filter,
+      builder: Builder,
+      canPartialPushDownConjuncts: Boolean): Option[Builder] = {
     def getType(attribute: String): PredicateLeaf.Type =
       getPredicateLeafType(dataTypeMap(attribute))
 
@@ -145,32 +162,52 @@ private[sql] object OrcFilters {
 
     expression match {
       case And(left, right) =>
-        // At here, it is not safe to just convert one side if we do not 
understand the
-        // other side. Here is an example used to explain the reason.
+        // At here, it is not safe to just convert one side and remove the 
other side
+        // if we do not understand what the parent filters are.
+        //
+        // Here is an example used to explain the reason.
         // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
         // convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
         // NOT(a = 2), which will generate wrong results.
-        // Pushing one side of AND down is only safe to do at the top level.
-        // You can see ParquetRelation's initializeLocalJobFunc method as an 
example.
-        for {
-          _ <- buildSearchArgument(dataTypeMap, left, newBuilder)
-          _ <- buildSearchArgument(dataTypeMap, right, newBuilder)
-          lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd())
-          rhs <- buildSearchArgument(dataTypeMap, right, lhs)
-        } yield rhs.end()
+        //
+        // Pushing one side of AND down is only safe to do at the top level or 
in the child
+        // AND before hitting NOT or OR conditions, and in this case, the 
unsupported predicate
+        // can be safely removed.
+        val leftBuilderOption =
+          createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
+        val rightBuilderOption =
+          createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
+        (leftBuilderOption, rightBuilderOption) match {
+          case (Some(_), Some(_)) =>
+            for {
+              lhs <- createBuilder(dataTypeMap, left,
+                builder.startAnd(), canPartialPushDownConjuncts)
+              rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
+            } yield rhs.end()
+
+          case (Some(_), None) if canPartialPushDownConjuncts =>
+            createBuilder(dataTypeMap, left, builder, 
canPartialPushDownConjuncts)
+
+          case (None, Some(_)) if canPartialPushDownConjuncts =>
+            createBuilder(dataTypeMap, right, builder, 
canPartialPushDownConjuncts)
+
+          case _ => None
+        }
 
       case Or(left, right) =>
         for {
-          _ <- buildSearchArgument(dataTypeMap, left, newBuilder)
-          _ <- buildSearchArgument(dataTypeMap, right, newBuilder)
-          lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr())
-          rhs <- buildSearchArgument(dataTypeMap, right, lhs)
+          _ <- createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts = false)
+          _ <- createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts = false)
+          lhs <- createBuilder(dataTypeMap, left,
+            builder.startOr(), canPartialPushDownConjuncts = false)
+          rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts = false)
         } yield rhs.end()
 
       case Not(child) =>
         for {
-          _ <- buildSearchArgument(dataTypeMap, child, newBuilder)
-          negate <- buildSearchArgument(dataTypeMap, child, builder.startNot())
+          _ <- createBuilder(dataTypeMap, child, newBuilder, 
canPartialPushDownConjuncts = false)
+          negate <- createBuilder(dataTypeMap,
+            child, builder.startNot(), canPartialPushDownConjuncts = false)
         } yield negate.end()
 
       // NOTE: For all case branches dealing with leaf predicates below, the 
additional `startAnd()`

http://git-wip-us.apache.org/repos/asf/spark/blob/6df23457/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 8680b86..ee12f30 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -358,7 +358,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
     }
   }
 
-  test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
+  test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC 
SearchArguments") {
     import org.apache.spark.sql.sources._
     // The `LessThan` should be converted while the `StringContains` shouldn't
     val schema = new StructType(
@@ -382,5 +382,40 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext 
{
         ))
       )).get.toString
     }
+
+    // Can not remove unsupported `StringContains` predicate since it is under 
`Or` operator.
+    assert(OrcFilters.createFilter(schema, Array(
+      Or(
+        LessThan("a", 10),
+        And(
+          StringContains("b", "prefix"),
+          GreaterThan("a", 1)
+        )
+      )
+    )).isEmpty)
+
+    // Safely remove unsupported `StringContains` predicate and push down 
`LessThan`
+    assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
+      OrcFilters.createFilter(schema, Array(
+        And(
+          LessThan("a", 10),
+          StringContains("b", "prefix")
+        )
+      )).get.toString
+    }
+
+    // Safely remove unsupported `StringContains` predicate, push down 
`LessThan` and `GreaterThan`.
+    assertResult("leaf-0 = (LESS_THAN a 10), leaf-1 = (LESS_THAN_EQUALS a 1)," 
+
+      " expr = (and leaf-0 (not leaf-1))") {
+      OrcFilters.createFilter(schema, Array(
+        And(
+          And(
+            LessThan("a", 10),
+            StringContains("b", "prefix")
+          ),
+          GreaterThan("a", 1)
+        )
+      )).get.toString
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6df23457/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index aee9cb5..a82576a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -79,6 +79,23 @@ private[orc] object OrcFilters extends Logging {
       dataTypeMap: Map[String, DataType],
       expression: Filter,
       builder: Builder): Option[Builder] = {
+    createBuilder(dataTypeMap, expression, builder, 
canPartialPushDownConjuncts = true)
+  }
+
+  /**
+   * @param dataTypeMap a map from the attribute name to its data type.
+   * @param expression the input filter predicates.
+   * @param builder the input SearchArgument.Builder.
+   * @param canPartialPushDownConjuncts whether a subset of conjuncts of 
predicates can be pushed
+   *                                    down safely. Pushing ONLY one side of 
AND down is safe to
+   *                                    do at the top level or none of its 
ancestors is NOT and OR.
+   * @return the builder so far.
+   */
+  private def createBuilder(
+      dataTypeMap: Map[String, DataType],
+      expression: Filter,
+      builder: Builder,
+      canPartialPushDownConjuncts: Boolean): Option[Builder] = {
     def isSearchableType(dataType: DataType): Boolean = dataType match {
       // Only the values in the Spark types below can be recognized by
       // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
@@ -90,32 +107,52 @@ private[orc] object OrcFilters extends Logging {
 
     expression match {
       case And(left, right) =>
-        // At here, it is not safe to just convert one side if we do not 
understand the
-        // other side. Here is an example used to explain the reason.
+        // At here, it is not safe to just convert one side and remove the 
other side
+        // if we do not understand what the parent filters are.
+        //
+        // Here is an example used to explain the reason.
         // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
         // convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
         // NOT(a = 2), which will generate wrong results.
-        // Pushing one side of AND down is only safe to do at the top level.
-        // You can see ParquetRelation's initializeLocalJobFunc method as an 
example.
-        for {
-          _ <- buildSearchArgument(dataTypeMap, left, newBuilder)
-          _ <- buildSearchArgument(dataTypeMap, right, newBuilder)
-          lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd())
-          rhs <- buildSearchArgument(dataTypeMap, right, lhs)
-        } yield rhs.end()
+        //
+        // Pushing one side of AND down is only safe to do at the top level or 
in the child
+        // AND before hitting NOT or OR conditions, and in this case, the 
unsupported predicate
+        // can be safely removed.
+        val leftBuilderOption =
+          createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
+        val rightBuilderOption =
+          createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
+        (leftBuilderOption, rightBuilderOption) match {
+          case (Some(_), Some(_)) =>
+            for {
+              lhs <- createBuilder(dataTypeMap, left,
+                builder.startAnd(), canPartialPushDownConjuncts)
+              rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
+            } yield rhs.end()
+
+          case (Some(_), None) if canPartialPushDownConjuncts =>
+            createBuilder(dataTypeMap, left, builder, 
canPartialPushDownConjuncts)
+
+          case (None, Some(_)) if canPartialPushDownConjuncts =>
+            createBuilder(dataTypeMap, right, builder, 
canPartialPushDownConjuncts)
+
+          case _ => None
+        }
 
       case Or(left, right) =>
         for {
-          _ <- buildSearchArgument(dataTypeMap, left, newBuilder)
-          _ <- buildSearchArgument(dataTypeMap, right, newBuilder)
-          lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr())
-          rhs <- buildSearchArgument(dataTypeMap, right, lhs)
+          _ <- createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts = false)
+          _ <- createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts = false)
+          lhs <- createBuilder(dataTypeMap, left,
+            builder.startOr(), canPartialPushDownConjuncts = false)
+          rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts = false)
         } yield rhs.end()
 
       case Not(child) =>
         for {
-          _ <- buildSearchArgument(dataTypeMap, child, newBuilder)
-          negate <- buildSearchArgument(dataTypeMap, child, builder.startNot())
+          _ <- createBuilder(dataTypeMap, child, newBuilder, 
canPartialPushDownConjuncts = false)
+          negate <- createBuilder(dataTypeMap,
+            child, builder.startNot(), canPartialPushDownConjuncts = false)
         } yield negate.end()
 
       // NOTE: For all case branches dealing with leaf predicates below, the 
additional `startAnd()`

http://git-wip-us.apache.org/repos/asf/spark/blob/6df23457/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
index 283037c..5094763 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
@@ -351,7 +351,7 @@ class HiveOrcFilterSuite extends OrcTest with 
TestHiveSingleton {
     }
   }
 
-  test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
+  test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC 
SearchArguments") {
     import org.apache.spark.sql.sources._
     // The `LessThan` should be converted while the `StringContains` shouldn't
     val schema = new StructType(
@@ -383,5 +383,48 @@ class HiveOrcFilterSuite extends OrcTest with 
TestHiveSingleton {
         ))
       )).get.toString
     }
+
+    // Can not remove unsupported `StringContains` predicate since it is under 
`Or` operator.
+    assert(OrcFilters.createFilter(schema, Array(
+      Or(
+        LessThan("a", 10),
+        And(
+          StringContains("b", "prefix"),
+          GreaterThan("a", 1)
+        )
+      )
+    )).isEmpty)
+
+    // Safely remove unsupported `StringContains` predicate and push down 
`LessThan`
+    assertResult(
+      """leaf-0 = (LESS_THAN a 10)
+        |expr = leaf-0
+      """.stripMargin.trim
+    ) {
+      OrcFilters.createFilter(schema, Array(
+        And(
+          LessThan("a", 10),
+          StringContains("b", "prefix")
+        )
+      )).get.toString
+    }
+
+    // Safely remove unsupported `StringContains` predicate, push down 
`LessThan` and `GreaterThan`.
+    assertResult(
+      """leaf-0 = (LESS_THAN a 10)
+        |leaf-1 = (LESS_THAN_EQUALS a 1)
+        |expr = (and leaf-0 (not leaf-1))
+      """.stripMargin.trim
+    ) {
+      OrcFilters.createFilter(schema, Array(
+        And(
+          And(
+            LessThan("a", 10),
+            StringContains("b", "prefix")
+          ),
+          GreaterThan("a", 1)
+        )
+      )).get.toString
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to