This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c2bc58c  [SPARK-31489][SQL] Fix pushing down filters with 
`java.time.LocalDate` values in ORC
c2bc58c is described below

commit c2bc58c2ded8ff55ff495b966d8d2bab29549fa0
Author: Max Gekk <[email protected]>
AuthorDate: Sun Apr 26 15:49:00 2020 -0700

    [SPARK-31489][SQL] Fix pushing down filters with `java.time.LocalDate` 
values in ORC
    
    ### What changes were proposed in this pull request?
    Convert `java.time.LocalDate` to `java.sql.Date` in pushed down filters to 
ORC datasource when Java 8 time API enabled.
    
    Closes #28272
    
    ### Why are the changes needed?
    The changes fix the exception raised while pushing date filters when 
`spark.sql.datetime.java8API.enabled` is set to `true`:
    ```
    Wrong value class java.time.LocalDate for DATE.EQUALS leaf
    java.lang.IllegalArgumentException: Wrong value class java.time.LocalDate 
for DATE.EQUALS leaf
        at 
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192)
        at 
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75)
        at 
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$BuilderImpl.equals(SearchArgumentImpl.java:352)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.buildLeafSearchArgument(OrcFilters.scala:229)
    ```
    
    ### Does this PR introduce any user-facing change?
    Yes
    
    ### How was this patch tested?
    Added tests to `OrcFilterSuite`.
    
    Closes #28261 from MaxGekk/orc-date-filter-pushdown.
    
    Authored-by: Max Gekk <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit bd139bda4a0e01961b333349e2a60ed0258d8d56)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/execution/datasources/orc/OrcFilters.scala |  5 +++
 .../execution/datasources/orc/OrcFilterSuite.scala | 43 +++++++++++++---------
 .../sql/execution/datasources/orc/OrcFilters.scala |  5 +++
 .../execution/datasources/orc/OrcFilterSuite.scala | 43 +++++++++++++---------
 4 files changed, 60 insertions(+), 36 deletions(-)

diff --git 
a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
 
b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index f5abd30..a01d5a4 100644
--- 
a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ 
b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.orc
 
+import java.time.LocalDate
+
 import org.apache.orc.storage.common.`type`.HiveDecimal
 import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
 import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
@@ -24,6 +26,7 @@ import 
org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
 import org.apache.orc.storage.serde2.io.HiveDecimalWritable
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays, 
toJavaDate}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
@@ -162,6 +165,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
       value.asInstanceOf[Number].doubleValue()
     case _: DecimalType =>
       new 
HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
+    case _: DateType if value.isInstanceOf[LocalDate] =>
+      toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
     case _ => value
   }
 
diff --git 
a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
 
b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index ee5162b..a1c325e 100644
--- 
a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ 
b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -299,26 +299,33 @@ class OrcFilterSuite extends OrcTest with 
SharedSparkSession {
   }
 
   test("filter pushdown - date") {
-    val dates = Seq("2017-08-18", "2017-08-19", "2017-08-20", 
"2017-08-21").map { day =>
+    val input = Seq("2017-08-18", "2017-08-19", "2017-08-20", 
"2017-08-21").map { day =>
       Date.valueOf(day)
     }
-    withOrcDataFrame(dates.map(Tuple1(_))) { implicit df =>
-      checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
-      checkFilterPredicate($"_1" === dates(0), PredicateLeaf.Operator.EQUALS)
-      checkFilterPredicate($"_1" <=> dates(0), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
-      checkFilterPredicate($"_1" < dates(1), PredicateLeaf.Operator.LESS_THAN)
-      checkFilterPredicate($"_1" > dates(2), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
-      checkFilterPredicate($"_1" <= dates(0), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
-      checkFilterPredicate($"_1" >= dates(3), PredicateLeaf.Operator.LESS_THAN)
-
-      checkFilterPredicate(Literal(dates(0)) === $"_1", 
PredicateLeaf.Operator.EQUALS)
-      checkFilterPredicate(Literal(dates(0)) <=> $"_1", 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-      checkFilterPredicate(Literal(dates(1)) > $"_1", 
PredicateLeaf.Operator.LESS_THAN)
-      checkFilterPredicate(Literal(dates(2)) < $"_1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
-      checkFilterPredicate(Literal(dates(0)) >= $"_1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
-      checkFilterPredicate(Literal(dates(3)) <= $"_1", 
PredicateLeaf.Operator.LESS_THAN)
+    withOrcFile(input.map(Tuple1(_))) { path =>
+      Seq(false, true).foreach { java8Api =>
+        withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> 
java8Api.toString) {
+          readFile(path) { implicit df =>
+            val dates = input.map(Literal(_))
+            checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
+
+            checkFilterPredicate($"_1" === dates(0), 
PredicateLeaf.Operator.EQUALS)
+            checkFilterPredicate($"_1" <=> dates(0), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+            checkFilterPredicate($"_1" < dates(1), 
PredicateLeaf.Operator.LESS_THAN)
+            checkFilterPredicate($"_1" > dates(2), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+            checkFilterPredicate($"_1" <= dates(0), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+            checkFilterPredicate($"_1" >= dates(3), 
PredicateLeaf.Operator.LESS_THAN)
+
+            checkFilterPredicate(dates(0) === $"_1", 
PredicateLeaf.Operator.EQUALS)
+            checkFilterPredicate(dates(0) <=> $"_1", 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+            checkFilterPredicate(dates(1) > $"_1", 
PredicateLeaf.Operator.LESS_THAN)
+            checkFilterPredicate(dates(2) < $"_1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+            checkFilterPredicate(dates(0) >= $"_1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+            checkFilterPredicate(dates(3) <= $"_1", 
PredicateLeaf.Operator.LESS_THAN)
+          }
+        }
+      }
     }
   }
 
diff --git 
a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
 
b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 675e089..445a52c 100644
--- 
a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ 
b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.orc
 
+import java.time.LocalDate
+
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
@@ -24,6 +26,7 @@ import 
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays, 
toJavaDate}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
@@ -162,6 +165,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
       value.asInstanceOf[Number].doubleValue()
     case _: DecimalType =>
       new 
HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
+    case _: DateType if value.isInstanceOf[LocalDate] =>
+      toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
     case _ => value
   }
 
diff --git 
a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
 
b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 1baa69e8..815af05 100644
--- 
a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ 
b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -300,26 +300,33 @@ class OrcFilterSuite extends OrcTest with 
SharedSparkSession {
   }
 
   test("filter pushdown - date") {
-    val dates = Seq("2017-08-18", "2017-08-19", "2017-08-20", 
"2017-08-21").map { day =>
+    val input = Seq("2017-08-18", "2017-08-19", "2017-08-20", 
"2017-08-21").map { day =>
       Date.valueOf(day)
     }
-    withOrcDataFrame(dates.map(Tuple1(_))) { implicit df =>
-      checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
-      checkFilterPredicate($"_1" === dates(0), PredicateLeaf.Operator.EQUALS)
-      checkFilterPredicate($"_1" <=> dates(0), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
-      checkFilterPredicate($"_1" < dates(1), PredicateLeaf.Operator.LESS_THAN)
-      checkFilterPredicate($"_1" > dates(2), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
-      checkFilterPredicate($"_1" <= dates(0), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
-      checkFilterPredicate($"_1" >= dates(3), PredicateLeaf.Operator.LESS_THAN)
-
-      checkFilterPredicate(Literal(dates(0)) === $"_1", 
PredicateLeaf.Operator.EQUALS)
-      checkFilterPredicate(Literal(dates(0)) <=> $"_1", 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-      checkFilterPredicate(Literal(dates(1)) > $"_1", 
PredicateLeaf.Operator.LESS_THAN)
-      checkFilterPredicate(Literal(dates(2)) < $"_1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
-      checkFilterPredicate(Literal(dates(0)) >= $"_1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
-      checkFilterPredicate(Literal(dates(3)) <= $"_1", 
PredicateLeaf.Operator.LESS_THAN)
+    withOrcFile(input.map(Tuple1(_))) { path =>
+      Seq(false, true).foreach { java8Api =>
+        withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> 
java8Api.toString) {
+          readFile(path) { implicit df =>
+            val dates = input.map(Literal(_))
+            checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
+
+            checkFilterPredicate($"_1" === dates(0), 
PredicateLeaf.Operator.EQUALS)
+            checkFilterPredicate($"_1" <=> dates(0), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+            checkFilterPredicate($"_1" < dates(1), 
PredicateLeaf.Operator.LESS_THAN)
+            checkFilterPredicate($"_1" > dates(2), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+            checkFilterPredicate($"_1" <= dates(0), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+            checkFilterPredicate($"_1" >= dates(3), 
PredicateLeaf.Operator.LESS_THAN)
+
+            checkFilterPredicate(dates(0) === $"_1", 
PredicateLeaf.Operator.EQUALS)
+            checkFilterPredicate(dates(0) <=> $"_1", 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+            checkFilterPredicate(dates(1) > $"_1", 
PredicateLeaf.Operator.LESS_THAN)
+            checkFilterPredicate(dates(2) < $"_1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+            checkFilterPredicate(dates(0) >= $"_1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+            checkFilterPredicate(dates(3) <= $"_1", 
PredicateLeaf.Operator.LESS_THAN)
+          }
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to