This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c2bc58c [SPARK-31489][SQL] Fix pushing down filters with
`java.time.LocalDate` values in ORC
c2bc58c is described below
commit c2bc58c2ded8ff55ff495b966d8d2bab29549fa0
Author: Max Gekk <[email protected]>
AuthorDate: Sun Apr 26 15:49:00 2020 -0700
[SPARK-31489][SQL] Fix pushing down filters with `java.time.LocalDate`
values in ORC
### What changes were proposed in this pull request?
Convert `java.time.LocalDate` to `java.sql.Date` in pushed down filters to
ORC datasource when Java 8 time API enabled.
Closes #28272
### Why are the changes needed?
The changes fix the exception raised while pushing date filters when
`spark.sql.datetime.java8API.enabled` is set to `true`:
```
Wrong value class java.time.LocalDate for DATE.EQUALS leaf
java.lang.IllegalArgumentException: Wrong value class java.time.LocalDate
for DATE.EQUALS leaf
at
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192)
at
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75)
at
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$BuilderImpl.equals(SearchArgumentImpl.java:352)
at
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.buildLeafSearchArgument(OrcFilters.scala:229)
```
### Does this PR introduce any user-facing change?
Yes
### How was this patch tested?
Added tests to `OrcFilterSuite`.
Closes #28261 from MaxGekk/orc-date-filter-pushdown.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit bd139bda4a0e01961b333349e2a60ed0258d8d56)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/execution/datasources/orc/OrcFilters.scala | 5 +++
.../execution/datasources/orc/OrcFilterSuite.scala | 43 +++++++++++++---------
.../sql/execution/datasources/orc/OrcFilters.scala | 5 +++
.../execution/datasources/orc/OrcFilterSuite.scala | 43 +++++++++++++---------
4 files changed, 60 insertions(+), 36 deletions(-)
diff --git
a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index f5abd30..a01d5a4 100644
---
a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++
b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.orc
+import java.time.LocalDate
+
import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
@@ -24,6 +26,7 @@ import
org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays,
toJavaDate}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -162,6 +165,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
value.asInstanceOf[Number].doubleValue()
case _: DecimalType =>
new
HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
+ case _: DateType if value.isInstanceOf[LocalDate] =>
+ toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
case _ => value
}
diff --git
a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index ee5162b..a1c325e 100644
---
a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++
b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -299,26 +299,33 @@ class OrcFilterSuite extends OrcTest with
SharedSparkSession {
}
test("filter pushdown - date") {
- val dates = Seq("2017-08-18", "2017-08-19", "2017-08-20",
"2017-08-21").map { day =>
+ val input = Seq("2017-08-18", "2017-08-19", "2017-08-20",
"2017-08-21").map { day =>
Date.valueOf(day)
}
- withOrcDataFrame(dates.map(Tuple1(_))) { implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === dates(0), PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> dates(0),
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < dates(1), PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > dates(2),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= dates(0),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= dates(3), PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(dates(0)) === $"_1",
PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(dates(0)) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(dates(1)) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(dates(2)) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(dates(0)) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(dates(3)) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withOrcFile(input.map(Tuple1(_))) { path =>
+ Seq(false, true).foreach { java8Api =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key ->
java8Api.toString) {
+ readFile(path) { implicit df =>
+ val dates = input.map(Literal(_))
+ checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate($"_1" === dates(0),
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate($"_1" <=> dates(0),
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate($"_1" < dates(1),
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate($"_1" > dates(2),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate($"_1" <= dates(0),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate($"_1" >= dates(3),
PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(dates(0) === $"_1",
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(dates(0) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(dates(1) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(dates(2) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(dates(0) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(dates(3) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+ }
}
}
diff --git
a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 675e089..445a52c 100644
---
a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++
b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.orc
+import java.time.LocalDate
+
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
@@ -24,6 +26,7 @@ import
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays,
toJavaDate}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -162,6 +165,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
value.asInstanceOf[Number].doubleValue()
case _: DecimalType =>
new
HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
+ case _: DateType if value.isInstanceOf[LocalDate] =>
+ toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
case _ => value
}
diff --git
a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 1baa69e8..815af05 100644
---
a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++
b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -300,26 +300,33 @@ class OrcFilterSuite extends OrcTest with
SharedSparkSession {
}
test("filter pushdown - date") {
- val dates = Seq("2017-08-18", "2017-08-19", "2017-08-20",
"2017-08-21").map { day =>
+ val input = Seq("2017-08-18", "2017-08-19", "2017-08-20",
"2017-08-21").map { day =>
Date.valueOf(day)
}
- withOrcDataFrame(dates.map(Tuple1(_))) { implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === dates(0), PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> dates(0),
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < dates(1), PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > dates(2),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= dates(0),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= dates(3), PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(dates(0)) === $"_1",
PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(dates(0)) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(dates(1)) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(dates(2)) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(dates(0)) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(dates(3)) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withOrcFile(input.map(Tuple1(_))) { path =>
+ Seq(false, true).foreach { java8Api =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key ->
java8Api.toString) {
+ readFile(path) { implicit df =>
+ val dates = input.map(Literal(_))
+ checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate($"_1" === dates(0),
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate($"_1" <=> dates(0),
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate($"_1" < dates(1),
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate($"_1" > dates(2),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate($"_1" <= dates(0),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate($"_1" >= dates(3),
PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(dates(0) === $"_1",
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(dates(0) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(dates(1) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(dates(2) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(dates(0) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(dates(3) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]