This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4ca628eb [SPARK-33867][SQL] Instant and LocalDate values aren't
handled when generating SQL queries
4ca628eb is described below
commit 4ca628eb2f54c3e039867c5ccbb0cde7413c18e4
Author: Chircu <[email protected]>
AuthorDate: Thu Jan 28 11:58:20 2021 +0900
[SPARK-33867][SQL] Instant and LocalDate values aren't handled when
generating SQL queries
### What changes were proposed in this pull request?
When generating SQL queries only the old date time API types are handled
for values in org.apache.spark.sql.jdbc.JdbcDialect#compileValue. If the new
API is used (spark.sql.datetime.java8API.enabled=true) Instant and LocalDate
values are not quoted and errors are thrown. The change proposed is to handle
Instant and LocalDate values the same way that Timestamp and Date are.
### Why are the changes needed?
In the current state if an Instant is used in a filter, an exception will
be thrown.
Ex (dataset was read from PostgreSQL):
dataset.filter(current_timestamp().gt(col(VALID_FROM)))
Stacktrace (the T11 is from an instant formatted like
yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'):
Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or
near "T11"Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at
or near "T11" Position: 285 at
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2103)
at
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1836)
at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:257) at
org.postgresql.jdbc2.AbstractJdbc2Statement. [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test added
Closes #31148 from cristichircu/SPARK-33867.
Lead-authored-by: Chircu <[email protected]>
Co-authored-by: Cristi Chircu <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
(cherry picked from commit 829f118f98ef0732c8dd784f06298465e47ee3a0)
Signed-off-by: Takeshi Yamamuro <[email protected]>
---
.../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 10 ++++++++++
.../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 ++++++++++++++
2 files changed, 24 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index ead0a1a..6c72172 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.jdbc
import java.sql.{Connection, Date, Timestamp}
+import java.time.{Instant, LocalDate}
import scala.collection.mutable.ArrayBuilder
@@ -26,9 +27,11 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils,
TimestampFormatter}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
@@ -175,7 +178,14 @@ abstract class JdbcDialect extends Serializable with
Logging{
def compileValue(value: Any): Any = value match {
case stringValue: String => s"'${escapeSql(stringValue)}'"
case timestampValue: Timestamp => "'" + timestampValue + "'"
+ case timestampValue: Instant =>
+ val timestampFormatter = TimestampFormatter.getFractionFormatter(
+ DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ s"'${timestampFormatter.format(timestampValue)}'"
case dateValue: Date => "'" + dateValue + "'"
+ case dateValue: LocalDate =>
+ val dateFormatter =
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ s"'${dateFormatter.format(dateValue)}'"
case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
case _ => value
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b81824d..70f5508 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
import java.math.BigDecimal
import java.sql.{Date, DriverManager, SQLException, Timestamp}
+import java.time.{Instant, LocalDate}
import java.util.{Calendar, GregorianCalendar, Properties}
import scala.collection.JavaConverters._
@@ -1005,6 +1006,19 @@ class JDBCSuite extends QueryTest
=== java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543"))
}
+ test("SPARK-33867: Test DataFrame.where for LocalDate and Instant") {
+ // Test for SPARK-33867
+ val timestamp = Instant.parse("2001-02-20T11:22:33.543543Z")
+ val date = LocalDate.parse("1995-01-01")
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+ val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new
Properties())
+ val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect()
+ assert(rows(0).getAs[LocalDate](1) === LocalDate.parse("1996-01-01"))
+ // 8 hour difference since saved time was America/Los_Angeles and
Instant is GMT
+ assert(rows(0).getAs[Instant](2) ===
Instant.parse("2002-02-20T19:22:33.543543Z"))
+ }
+ }
+
test("test credentials in the properties are not in plan output") {
val df = sql("SELECT * FROM parts")
val explain = ExplainCommand(df.queryExecution.logical, ExtendedMode)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]