Repository: spark Updated Branches: refs/heads/master 9362c5cc2 -> 5d726b865
[SPARK-25559][SQL] Remove the unsupported predicates in Parquet when possible ## What changes were proposed in this pull request? Currently, in `ParquetFilters`, if one of the children predicates is not supported by Parquet, the entire predicates will be thrown away. In fact, if the unsupported predicate is in the top level `And` condition or in the child before hitting `Not` or `Or` condition, it can be safely removed. ## How was this patch tested? Tests are added. Closes #22574 from dbtsai/removeUnsupportedPredicatesInParquet. Lead-authored-by: DB Tsai <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Co-authored-by: DB Tsai <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d726b86 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d726b86 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d726b86 Branch: refs/heads/master Commit: 5d726b865948f993911fd5b9730b25cfa94e16c7 Parents: 9362c5c Author: DB Tsai <[email protected]> Authored: Fri Sep 28 17:46:11 2018 -0700 Committer: Dongjoon Hyun <[email protected]> Committed: Fri Sep 28 17:46:11 2018 -0700 ---------------------------------------------------------------------- .../datasources/parquet/ParquetFilters.scala | 38 +++-- .../parquet/ParquetFilterSuite.scala | 147 ++++++++++++++++++- 2 files changed, 172 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5d726b86/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 0c286de..44a0d20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -394,7 +394,13 @@ private[parquet] class ParquetFilters( */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToParquetField = getFieldMap(schema) + createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true) + } + private def createFilterHelper( + nameToParquetField: Map[String, ParquetField], + predicate: sources.Filter, + canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = { // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { @@ -488,26 +494,36 @@ private[parquet] class ParquetFilters( .map(_(nameToParquetField(name).fieldName, value)) case sources.And(lhs, rhs) => - // At here, it is not safe to just convert one side if we do not understand the - // other side. Here is an example used to explain the reason. + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to // convert b in ('1'). If we only convert a = 2, we will end up with a filter // NOT(a = 2), which will generate wrong results. - // Pushing one side of AND down is only safe to do at the top level. - // You can see ParquetRelation's initializeLocalJobFunc method as an example. - for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) - } yield FilterApi.and(lhsFilter, rhsFilter) + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) + val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) + + (lhsFilterOption, rhsFilterOption) match { + case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter)) + case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) + case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + case _ => None + } case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) + rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilter(schema, pred).map(FilterApi.not) + createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + .map(FilterApi.not) case sources.In(name, values) if canMakeFilterOn(name, values.head) && values.distinct.length <= pushDownInFilterThreshold => http://git-wip-us.apache.org/repos/asf/spark/blob/5d726b86/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 7ebb750..01e41b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -750,7 +750,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("SPARK-12218 Converting conjunctions into Parquet filter predicates") { + test("SPARK-12218 and SPARK-25559 Converting conjunctions into Parquet filter predicates") { val schema = StructType(Seq( StructField("a", IntegerType, nullable = false), StructField("b", StringType, nullable = true), @@ -770,7 +770,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("c", 1.5D))) } - assertResult(None) { + // Testing when `canRemoveOneSideInAnd == true` + // case sources.And(lhs, rhs) => + // ... + // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) + assertResult(Some(lt(intColumn("a"), 10: Integer))) { parquetFilters.createFilter( parquetSchema, sources.And( @@ -778,6 +782,83 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.StringContains("b", "prefix"))) } + // Testing when `canRemoveOneSideInAnd == true` + // case sources.And(lhs, rhs) => + // ... + // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + assertResult(Some(lt(intColumn("a"), 10: Integer))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10))) + } + + // Testing complex And conditions + assertResult(Some( + FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.And( + sources.LessThan("a", 10), + sources.StringContains("b", "prefix") + ), + sources.GreaterThan("a", 5))) + } + + // Testing complex And conditions + assertResult(Some( + FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.GreaterThan("a", 5), + sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10) + ))) + } + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2))) + } + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing when `canRemoveOneSideInAnd == false` + // case sources.And(lhs, rhs) => + // ... + // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) assertResult(None) { parquetFilters.createFilter( parquetSchema, @@ -786,6 +867,68 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("a", 1), sources.StringContains("b", "prefix")))) } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing when `canRemoveOneSideInAnd == false` + // case sources.And(lhs, rhs) => + // ... + // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.StringContains("b", "prefix"), + sources.GreaterThan("a", 1)))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing passing `canRemoveOneSideInAnd = false` into + // case sources.And(lhs, rhs) => + // val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2)))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing passing `canRemoveOneSideInAnd = false` into + // case sources.And(lhs, rhs) => + // val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix"))))) + } } test("SPARK-16371 Do not push down filters when inner name and outer name are the same") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
