This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bebd78e13160 [SPARK-47761][SQL] Oracle: Support reading
AnsiIntervalTypes
bebd78e13160 is described below
commit bebd78e13160b37de5b3363e6b3c4c63365af601
Author: Kent Yao <[email protected]>
AuthorDate: Wed Apr 10 11:37:57 2024 +0800
[SPARK-47761][SQL] Oracle: Support reading AnsiIntervalTypes
### What changes were proposed in this pull request?
In this PR, I proposed to add support for reading well-defined ANSI
interval types from Oracle databases.
- NTERVAL YEAR [(year_precision)] TO MONTH
- Stores a period of time in years and months, where year_precision is
the number of digits in the YEAR datetime field. Accepted values are 0 to 9.
The default is 2. The size is fixed at 5 bytes.
- INTERVAL DAY [(day_precision)] TO SECOND [(fractional_seconds_precision)]
- Stores a period of time in days, hours, minutes, and seconds, where
- day_precision is the maximum number of digits in the DAY datetime
field. Accepted values are 0 to 9. The default is 2.
- fractional_seconds_precision is the number of digits in the
fractional part of the SECOND field. Accepted values are 0 to 9. The default is
6. The size is fixed at 11 bytes.
Both of them are mapped to the defaults of AnsiIntervalTypes.
We also add two developer APIs that convert interval strings to underlying
representations of YearMonthIntervalType and DaytimeIntervalType. The default
implementations assume that the inputs are compatible with the ANSI style,
incompatible values will fail to be parsed. However, it shall fail much earlier
at the data type mapping step because of undefined mapping rules
### Why are the changes needed?
Improve the Oracle accessibility, as tt's safe to read Oracle external
intervals
### Does this PR introduce _any_ user-facing change?
Yes, reading Oracle intervals are available now
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45925 from yaooqinn/SPARK-47761.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../spark/sql/jdbc/OracleIntegrationSuite.scala | 29 +++++++++++++++++++++-
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 10 ++++++++
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 26 +++++++++++++++++++
.../org/apache/spark/sql/jdbc/OracleDialect.scala | 14 +++++++++++
4 files changed, 78 insertions(+), 1 deletion(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 3b6e7faa164e..7728ee774bda 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.jdbc
import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
+import java.time.{Duration, Period}
import java.util.{Properties, TimeZone}
import org.scalatest.time.SpanSugar._
-import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.execution.{RowDataSourceScanExec,
WholeStageCodegenExec}
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -540,4 +541,30 @@ class OracleIntegrationSuite extends
DockerJDBCIntegrationSuite with SharedSpark
assert(df.count() === 2)
assert(df2.collect().forall(_.getTimestamp(0) === row1))
}
+
+ test("SPARK-47761: Reading ANSI INTERVAL Types") {
+ val df: String => DataFrame = query => spark.read.format("jdbc")
+ .option("url", jdbcUrl)
+ .option("query", query)
+ .load()
+ checkAnswer(df("SELECT INTERVAL '1-2' YEAR(1) TO MONTH as i0 FROM dual"),
+ Row(Period.of(1, 2, 0)))
+ checkAnswer(df("SELECT INTERVAL '1-2' YEAR(2) TO MONTH as i1 FROM dual"),
+ Row(Period.of(1, 2, 0)))
+ checkAnswer(df("SELECT INTERVAL '12345-2' YEAR(9) TO MONTH as i2 FROM
dual"),
+ Row(Period.of(12345, 2, 0)))
+ checkAnswer(df("SELECT INTERVAL '1 12:23:56' DAY(1) TO SECOND(0) as i3
FROM dual"),
+ Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56)))
+ checkAnswer(df("SELECT INTERVAL '1 12:23:56.12' DAY TO SECOND(2) as i4
FROM dual"),
+
Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(120)))
+ checkAnswer(df("SELECT INTERVAL '1 12:23:56.1234' DAY TO SECOND(4) as i5
FROM dual"),
+
Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123)
+ .plusNanos(400000)))
+ checkAnswer(df("SELECT INTERVAL '1 12:23:56.123456' DAY TO SECOND(6) as i6
FROM dual"),
+
Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123)
+ .plusNanos(456000)))
+ checkAnswer(df("SELECT INTERVAL '1 12:23:56.12345678' DAY TO SECOND(8) as
i7 FROM dual"),
+
Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123)
+ .plusNanos(456000)))
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 08313f26a877..13cad43f9734 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -532,6 +532,16 @@ object JdbcUtils extends Logging with SQLConfHelper {
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))
+ case _: YearMonthIntervalType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.update(pos,
+ nullSafeConvert(rs.getString(pos + 1),
dialect.getYearMonthIntervalAsMonths))
+
+ case _: DayTimeIntervalType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.update(pos,
+ nullSafeConvert(rs.getString(pos + 1),
dialect.getDayTimeIntervalAsMicros))
+
case _: ArrayType if metadata.contains("pg_bit_array_type") =>
// SPARK-47628: Handle PostgreSQL bit(n>1) array type ahead. As in the
pgjdbc driver,
// bit(n>1)[] is not distinguishable from bit(1)[], and they are all
recognized as boolen[].
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index d800cc6a8617..5e98bcbce489 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -21,6 +21,7 @@ import java.sql.{Connection, Date, Driver, Statement,
Timestamp}
import java.time.{Instant, LocalDate, LocalDateTime}
import java.util
import java.util.ServiceLoader
+import java.util.concurrent.TimeUnit
import scala.collection.mutable.ArrayBuilder
import scala.util.control.NonFatal
@@ -34,6 +35,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils,
TimestampFormatter}
import
org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateTimeToMicros,
toJavaTimestampNoRebase}
+import org.apache.spark.sql.catalyst.util.IntervalUtils.{fromDayTimeString,
fromYearMonthString, getDuration}
import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
@@ -164,6 +166,30 @@ abstract class JdbcDialect extends Serializable with
Logging {
@Since("4.0.0")
def convertJavaDateToDate(d: Date): Date = d
+ /**
+ * Converts an year-month interval string to an int value `months`.
+ *
+ * @param yearmonthStr the year-month interval string
+ * @return the number of total months in the interval
+ * @throws IllegalArgumentException if the input string is invalid
+ */
+ @Since("4.0.0")
+ def getYearMonthIntervalAsMonths(yearmonthStr: String): Int = {
+ fromYearMonthString(yearmonthStr).months
+ }
+
+ /**
+ * Converts a day-time interval string to a long value `micros`.
+ *
+ * @param daytimeStr the day-time interval string
+ * @return the number of total microseconds in the interval
+ * @throws IllegalArgumentException if the input string is invalid
+ */
+ @Since("4.0.0")
+ def getDayTimeIntervalAsMicros(daytimeStr: String): Long = {
+ getDuration(fromDayTimeString(daytimeStr), TimeUnit.MICROSECONDS)
+ }
+
/**
* Convert java.sql.Timestamp to a LocalDateTime representing the same
wall-clock time as the
* value stored in a remote database.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index a9c246c93879..001d47f13b21 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -103,6 +103,8 @@ private case class OracleDialect() extends JdbcDialect {
Some(TimestampType)
case BINARY_FLOAT => Some(FloatType) // Value for
OracleTypes.BINARY_FLOAT
case BINARY_DOUBLE => Some(DoubleType) // Value for
OracleTypes.BINARY_DOUBLE
+ case INTERVAL_YM => Some(YearMonthIntervalType())
+ case INTERVAL_DS => Some(DayTimeIntervalType())
case _ => None
}
}
@@ -231,4 +233,16 @@ private[jdbc] object OracleDialect {
final val TIMESTAMP_TZ = -101
// oracle.jdbc.OracleType.TIMESTAMP_WITH_LOCAL_TIME_ZONE
final val TIMESTAMP_LTZ = -102
+ // INTERVAL YEAR [(year_precision)] TO MONTH
+ // Stores a period of time in years and months, where year_precision is the
number of digits in
+ // the YEAR datetime field. Accepted values are 0 to 9. The default is 2.
+ // The size is fixed at 5 bytes.
+ final val INTERVAL_YM = -103
+ // INTERVAL DAY [(day_precision)] TO SECOND [(fractional_seconds_precision)]
+ // Stores a period of time in days, hours, minutes, and seconds, where
+ // - day_precision is the maximum number of digits in the DAY datetime field.
+ // Accepted values are 0 to 9. The default is 2.
+ // - fractional_seconds_precision is the number of digits in the fractional
part
+ // of the SECOND field. Accepted values are 0 to 9. The default is 6.
+ final val INTERVAL_DS = -104
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]