This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d7cc708e1e0d [SPARK-47280][SQL] Remove timezone limitation for ORACLE
TIMESTAMP WITH TIMEZONE
d7cc708e1e0d is described below
commit d7cc708e1e0dbda93f07832ae5e18cbc075f6431
Author: Kent Yao <[email protected]>
AuthorDate: Wed Mar 6 11:22:22 2024 +0800
[SPARK-47280][SQL] Remove timezone limitation for ORACLE TIMESTAMP WITH
TIMEZONE
### What changes were proposed in this pull request?
As illustrated by the Oracle
[Documentation](https://docs.oracle.com/en/database/oracle/oracle-database/23/jjdbc/Oracle-extensions.html#GUID-DB1F687A-CF1C-4B3F-92C0-126DC782EF53):
> TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE types can be
represented
as standard java.sql.Timestamp type.
The byte representation of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH
LOCAL TIME ZONE
types to java.sql.Timestamp is straight forward.
This is because the internal format of TIMESTAMP WITH TIME ZONE and
TIMESTAMP WITH LOCAL
TIME ZONE data types is GMT, and java.sql.Timestamp type objects internally
use a
milliseconds time value that is the number of milliseconds since EPOCH.
As we use `rs.getTimestamp` instead of `rs.getString` or
`rs.getTIMESTAMPTZ` to retrieve the timestamp value, it's safe to remove the
`spark.sql.session.timeZone` guard when mapping catalyst type to oracle/jdbc
types.
### Why are the changes needed?
Improve the functionality of the Oracle connector
### Does this PR introduce _any_ user-facing change?
When reading Oracle TIMESTAMP WITH TIMEZONE, the
`spark.sql.session.timeZone` can be changed without restriction for equivalence
of JVM default.
### How was this patch tested?
new unit tests added
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45384 from yaooqinn/SPARK-47280.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../sql/jdbc/DockerJDBCIntegrationSuite.scala | 3 +-
.../spark/sql/jdbc/OracleIntegrationSuite.scala | 39 ++++++++--------------
.../org/apache/spark/sql/jdbc/OracleDialect.scala | 24 +++++--------
3 files changed, 25 insertions(+), 41 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
index 5df7e00c9c3c..13db5844c604 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -36,6 +36,7 @@ import com.github.dockerjava.zerodep.ZerodepDockerHttpClient
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.SpanSugar._
+import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.{DockerUtils, Utils}
import org.apache.spark.util.Utils.timeStringAsSeconds
@@ -99,7 +100,7 @@ abstract class DatabaseOnDocker {
}
abstract class DockerJDBCIntegrationSuite
- extends SharedSparkSession with Eventually with DockerIntegrationFunSuite {
+ extends QueryTest with SharedSparkSession with Eventually with
DockerIntegrationFunSuite {
protected val dockerIp = DockerUtils.getDockerIp()
val db: DatabaseOnDocker
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 5ed0a2a1a7df..3b6e7faa164e 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -23,7 +23,6 @@ import java.util.{Properties, TimeZone}
import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkSQLException
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.execution.{RowDataSourceScanExec,
WholeStageCodegenExec}
@@ -69,6 +68,11 @@ class OracleIntegrationSuite extends
DockerJDBCIntegrationSuite with SharedSpark
override val connectionTimeout = timeout(7.minutes)
+ private val rsOfTsWithTimezone = Seq(
+ Row(BigDecimal.valueOf(1), new Timestamp(944046000000L)),
+ Row(BigDecimal.valueOf(2), new Timestamp(944078400000L))
+ )
+
override def dataPreparation(conn: Connection): Unit = {
// In 18.4.0 Express Edition auto commit is enabled by default.
conn.setAutoCommit(false)
@@ -275,7 +279,7 @@ class OracleIntegrationSuite extends
DockerJDBCIntegrationSuite with SharedSpark
assert(types(1).equals("class java.sql.Timestamp"))
}
- test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from
default") {
+ test("SPARK-47280: Remove timezone limitation for ORACLE TIMESTAMP WITH
TIMEZONE") {
val defaultJVMTimeZone = TimeZone.getDefault
// Pick the timezone different from the current default time zone of JVM
val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia")
@@ -284,35 +288,20 @@ class OracleIntegrationSuite extends
DockerJDBCIntegrationSuite with SharedSpark
if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else
shanghaiTimeZone
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key ->
localSessionTimeZone.getID) {
- checkError(
- exception = intercept[SparkSQLException] {
- sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new
Properties).collect()
- },
- errorClass = "UNRECOGNIZED_SQL_TYPE",
- parameters = Map("typeName" -> "TIMESTAMP WITH TIME ZONE", "jdbcType"
-> "-101"))
+ checkAnswer(
+ sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties),
+ rsOfTsWithTimezone)
}
}
test("Column TIMESTAMP with TIME ZONE(JVM timezone)") {
- def checkRow(row: Row, ts: String): Unit = {
- assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts)))
- }
-
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key ->
TimeZone.getDefault.getID) {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new
Properties)
- withDefaultTimeZone(PST) {
- assert(dfRead.collect().toSet ===
- Set(
- Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01
03:00:00")),
- Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01
12:00:00"))))
- }
-
- withDefaultTimeZone(UTC) {
- assert(dfRead.collect().toSet ===
- Set(
- Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01
11:00:00")),
- Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01
20:00:00"))))
- }
+ Seq(PST, UTC).foreach(timeZone => {
+ withDefaultTimeZone(timeZone) {
+ checkAnswer(dfRead, rsOfTsWithTimezone)
+ }
+ })
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 05c927416e7b..544c0197dec9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -18,15 +18,13 @@
package org.apache.spark.sql.jdbc
import java.sql.{Date, Timestamp, Types}
-import java.util.{Locale, TimeZone}
+import java.util.Locale
import scala.util.control.NonFatal
import org.apache.spark.SparkUnsupportedOperationException
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -80,13 +78,6 @@ private case object OracleDialect extends JdbcDialect {
}
}
- private def supportTimeZoneTypes: Boolean = {
- val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
- // TODO: support timezone types when users are not using the JVM timezone,
which
- // is the default value of SESSION_LOCAL_TIMEZONE
- timeZone == TimeZone.getDefault
- }
-
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = {
sqlType match {
@@ -107,11 +98,14 @@ private case object OracleDialect extends JdbcDialect {
case _ if scale == -127L =>
Option(DecimalType(DecimalType.MAX_PRECISION, 10))
case _ => None
}
- case TIMESTAMP_TZ if supportTimeZoneTypes =>
- // Value for Timestamp with Time Zone in Oracle
- Some(TimestampType)
- case TIMESTAMP_LTZ =>
- // Value for Timestamp with Local Time Zone in Oracle
+ case TIMESTAMP_TZ | TIMESTAMP_LTZ =>
+ // TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE types
can be represented
+ // as standard java.sql.Timestamp type.
+ // The byte representation of TIMESTAMP WITH TIME ZONE and TIMESTAMP
WITH LOCAL TIME ZONE
+ // types to java.sql.Timestamp is straight forward.
+ // This is because the internal format of TIMESTAMP WITH TIME ZONE and
TIMESTAMP WITH LOCAL
+ // TIME ZONE data types is GMT, and java.sql.Timestamp type objects
internally use a
+ // milliseconds time value that is the number of milliseconds since
EPOCH.
Some(TimestampType)
case BINARY_FLOAT => Some(FloatType) // Value for
OracleTypes.BINARY_FLOAT
case BINARY_DOUBLE => Some(DoubleType) // Value for
OracleTypes.BINARY_DOUBLE
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]