This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new a5f14f4b454 [SPARK-39900][SQL] Address partial or negated condition in binary format's predicate pushdown a5f14f4b454 is described below commit a5f14f4b454d99e04118a10b9a579375114c3c25 Author: zzzzming95 <505306...@qq.com> AuthorDate: Wed Aug 3 21:22:55 2022 +0900 [SPARK-39900][SQL] Address partial or negated condition in binary format's predicate pushdown ### What changes were proposed in this pull request? fix `BinaryFileFormat` filter push down bug. Before modification, when Filter tree is: ```` -Not - - IsNotNull ```` Since `IsNotNull` cannot be matched, `IsNotNull` will return a result that is always true (that is, `case _ => (_ => true)`), that is, no filter pushdown is performed. But because there is still a `Not`, after negation, it will return a result that is always False, that is, no result can be returned. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? test suit in `BinaryFileFormatSuite` ``` testCreateFilterFunction( Seq(Not(IsNull(LENGTH))), Seq((t1, true), (t2, true), (t3, true))) ``` Closes #37350 from zzzzming95/SPARK-39900. Lead-authored-by: zzzzming95 <505306...@qq.com> Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit a0dc7d9117b66426aaa2257c8d448a2f96882ecd) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../datasources/binaryfile/BinaryFileFormat.scala | 54 +++++++++++----------- .../binaryfile/BinaryFileFormatSuite.scala | 5 +- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index b2412433637..441e3456b4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -97,7 +97,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val filterFuncs = filters.map(filter => createFilterFunction(filter)) + val filterFuncs = filters.flatMap(filter => createFilterFunction(filter)) val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) file: PartitionedFile => { @@ -160,38 +160,38 @@ object BinaryFileFormat { StructField(LENGTH, LongType, false) :: StructField(CONTENT, BinaryType, true) :: Nil) - private[binaryfile] def createFilterFunction(filter: Filter): FileStatus => Boolean = { + private[binaryfile] def createFilterFunction(filter: Filter): Option[FileStatus => Boolean] = { filter match { - case And(left, right) => - s => createFilterFunction(left)(s) && createFilterFunction(right)(s) - case Or(left, right) => - s => createFilterFunction(left)(s) || createFilterFunction(right)(s) - case Not(child) => - s => !createFilterFunction(child)(s) - - case LessThan(LENGTH, value: Long) => - _.getLen < value - case LessThanOrEqual(LENGTH, value: Long) => - _.getLen <= value - case GreaterThan(LENGTH, value: Long) => - _.getLen > value - case GreaterThanOrEqual(LENGTH, value: Long) => - _.getLen >= value - case EqualTo(LENGTH, value: Long) => - _.getLen == value - + case And(left, right) => (createFilterFunction(left), createFilterFunction(right)) match { + case (Some(leftPred), Some(rightPred)) => Some(s => leftPred(s) && rightPred(s)) + case (Some(leftPred), None) => Some(leftPred) + case (None, Some(rightPred)) => Some(rightPred) + case (None, None) => Some(_ => true) + } + case Or(left, right) => (createFilterFunction(left), createFilterFunction(right)) match { + case (Some(leftPred), Some(rightPred)) => Some(s => leftPred(s) || rightPred(s)) + case _ => Some(_ => true) + } + case Not(child) => createFilterFunction(child) match { + case Some(pred) => Some(s => !pred(s)) + case _ => Some(_ => true) + } + case LessThan(LENGTH, value: Long) => Some(_.getLen < value) + case LessThanOrEqual(LENGTH, value: Long) => Some(_.getLen <= value) + case GreaterThan(LENGTH, value: Long) => Some(_.getLen > value) + case GreaterThanOrEqual(LENGTH, value: Long) => Some(_.getLen >= value) + case EqualTo(LENGTH, value: Long) => Some(_.getLen == value) case LessThan(MODIFICATION_TIME, value: Timestamp) => - _.getModificationTime < value.getTime + Some(_.getModificationTime < value.getTime) case LessThanOrEqual(MODIFICATION_TIME, value: Timestamp) => - _.getModificationTime <= value.getTime + Some(_.getModificationTime <= value.getTime) case GreaterThan(MODIFICATION_TIME, value: Timestamp) => - _.getModificationTime > value.getTime + Some(_.getModificationTime > value.getTime) case GreaterThanOrEqual(MODIFICATION_TIME, value: Timestamp) => - _.getModificationTime >= value.getTime + Some(_.getModificationTime >= value.getTime) case EqualTo(MODIFICATION_TIME, value: Timestamp) => - _.getModificationTime == value.getTime - - case _ => (_ => true) + Some(_.getModificationTime == value.getTime) + case _ => None } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index 86ff026d7b1..9a374d5c302 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -183,7 +183,7 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { def testCreateFilterFunction( filters: Seq[Filter], testCases: Seq[(FileStatus, Boolean)]): Unit = { - val funcs = filters.map(BinaryFileFormat.createFilterFunction) + val funcs = filters.flatMap(BinaryFileFormat.createFilterFunction) testCases.foreach { case (status, expected) => assert(funcs.forall(f => f(status)) === expected, s"$filters applied to $status should be $expected.") @@ -250,6 +250,9 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { Seq(Or(LessThanOrEqual(MODIFICATION_TIME, new Timestamp(1L)), GreaterThanOrEqual(MODIFICATION_TIME, new Timestamp(3L)))), Seq((t1, true), (t2, false), (t3, true))) + testCreateFilterFunction( + Seq(Not(IsNull(LENGTH))), + Seq((t1, true), (t2, true), (t3, true))) // test filters applied on both columns testCreateFilterFunction( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org