This is an automated email from the ASF dual-hosted git repository.
MaxGekk pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 109bd16ddee6 [SPARK-57207][SQL] Support nanosecond timestamp types in
the Types Framework
109bd16ddee6 is described below
commit 109bd16ddee64af420c1292da34bdff3bdb4f78d
Author: Maxim Gekk <[email protected]>
AuthorDate: Fri Jun 5 03:19:46 2026 +0200
[SPARK-57207][SQL] Support nanosecond timestamp types in the Types Framework
### What changes were proposed in this pull request?
This PR wires `TimestampNTZNanosType(p)` and `TimestampLTZNanosType(p)` (p
in [7, 9]) through the Spark SQL Types Framework (SPARK-53504), so that all
type-specific behavior for the nanosecond timestamp types is centralized behind
`TypeOps` / `TypeApiOps`. The nanos types are now supported **only** through
the framework: the scattered legacy dispatch for them is removed.
Concretely:
- Add `TimestampNanosTypeOps` (catalyst) with `TimestampNTZNanosTypeOps` /
`TimestampLTZNanosTypeOps`, registered in `TypeOps.apply()`. Overrides:
`getPhysicalType`, `getJavaClass`, `getBoxedJavaClass`, `getRowWriter`,
`getDefaultLiteral`, `getJavaLiteral`, `getMutableValue`, `toCatalystImpl`,
`toScala`/`toScalaImpl`, `createSerializer`, `createDeserializer`.
- Add a `getBoxedJavaClass` hook to the `TypeOps` base (the boxed Java
class used in codegen). The `createSerializer` / `createDeserializer` hooks
already exist on the base trait (used by `TimeTypeOps`); the nanos ops above
only override them.
- Add `TimestampNanosTypeApiOps` (sql/api) with NTZ/LTZ subclasses,
registered in `TypeApiOps.apply()`. `getEncoder` returns the SPARK-57033 leaves
(`LocalDateTimeNanosEncoder(p)` / `InstantNanosEncoder(p)`), gated by
`DataTypeErrors.checkTimestampNanosTypesEnabled()`.
- Remove the nanos branches from the legacy code paths now handled by the
framework: `SerializerBuildHelper`, `DeserializerBuildHelper`,
`CatalystTypeConverters`, `EncoderUtils`, `CodeGenerator`, `Literal`, and
`InternalRow`. In `SerializerBuildHelper` / `DeserializerBuildHelper`,
`OptionEncoder` / `TransformingEncoder` are unwrapped before the framework leaf
dispatch, since those wrapper encoders proxy `dataType` to the wrapped encoder.
- Add `MutableTimestampNanos` to `SpecificInternalRow` to avoid the
`MutableAny` fallback.
- Add a `checkValue` on `spark.sql.timestampNanosTypes.enabled` requiring
`spark.sql.types.framework.enabled=true`, so the types cannot be enabled
outside the framework.
Fractional-second string formatting is not implemented yet (no
`TimestampFormatter` for these types). Until it lands, converting a nanos value
to a string (CAST to STRING, EXPLAIN/SHOW output, SQL-literal rendering) raises
the new `UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING` error rather than
silently truncating to microseconds. Both the interpreted path
(`TimestampNanosTypeApiOps.format`) and the codegen path
(`ToStringBase.castToStringCode`) raise the identical error, so the two [...]
Out of scope (follow-ups): string formatting/CAST-to-string, Connect proto,
Arrow, PySpark conversion, Parquet/ColumnVector, and physical
ordering/compare/hash.
### Why are the changes needed?
The logical nanosecond timestamp types (SPARK-56876) and the physical row
layer (SPARK-56981) already exist, but these types were wired only through
scattered legacy dispatch. Centralizing the type-specific operations behind
`TypeOps`, consistent with `TimeType`, is a prerequisite for the remaining
nanosecond timestamp work and avoids the framework-on/off behavior divergence
that the previous per-call-site handling produced.
### Does this PR introduce _any_ user-facing change?
No. The nanosecond timestamp types are a preview feature gated by
`spark.sql.timestampNanosTypes.enabled` (and
`spark.sql.types.framework.enabled`), both off by default in production. When
these preview flags are enabled, converting a nanos timestamp to a string
raises `UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING` because
fractional-second formatting is not implemented yet.
### How was this patch tested?
Added/updated tests:
- `TimestampNanosTypeOpsSuite` (catalyst): `TypeOps`/`TypeApiOps`
registration; `PhysicalDataType`, default `Literal`, and codegen Java class;
`InternalRow`/`SpecificInternalRow` roundtrips incl. the dedicated
`MutableTimestampNanos` holder; `getEncoder` returns the SPARK-57033 nanos
encoders; `CatalystTypeConverters` `java.time` roundtrip; `format`/`toSQLValue`
raise `UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING`; framework-disabled
leaves the types unsupported (no legacy fallback); [...]
- `TimestampNanosTypeOpsSuite` also covers Option-wrapped nanos encoder
roundtrips (Some/None for NTZ and LTZ), verifying wrapper encoders are
unwrapped before the framework serde dispatch.
- `TimestampNanosRowSuite` (catalyst): CAST nanos -> STRING raises the
unsupported-feature error in both interpreted and codegen modes; unsafe/generic
row roundtrips; literal validation.
```
build/sbt 'catalyst/testOnly *TimestampNanosTypeOpsSuite
*TimestampNanosRowSuite'
build/sbt 'core/testOnly org.apache.spark.SparkThrowableSuite'
```
All tests pass. `catalyst` / `sql-api` scalastyle are clean.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor
Closes #56266 from MaxGekk/nanos-types-typeops.
Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit 49153409945c5ebe2cd8894fb2d1de10bdcf3497)
Signed-off-by: Max Gekk <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 5 +
.../spark/sql/catalyst/encoders/RowEncoder.scala | 12 +-
.../apache/spark/sql/errors/DataTypeErrors.scala | 6 +
.../sql/types/ops/TimestampNanosTypeApiOps.scala | 106 ++++++++++
.../apache/spark/sql/types/ops/TypeApiOps.scala | 4 +-
.../sql/catalyst/CatalystTypeConverters.scala | 48 +----
.../sql/catalyst/DeserializerBuildHelper.scala | 38 ++--
.../apache/spark/sql/catalyst/InternalRow.scala | 4 -
.../spark/sql/catalyst/SerializerBuildHelper.scala | 40 ++--
.../spark/sql/catalyst/encoders/EncoderUtils.scala | 14 +-
.../catalyst/expressions/SpecificInternalRow.scala | 16 ++
.../sql/catalyst/expressions/ToStringBase.scala | 8 +
.../expressions/codegen/CodeGenerator.scala | 2 -
.../spark/sql/catalyst/expressions/literals.scala | 2 -
.../sql/catalyst/types/PhysicalDataType.scala | 4 +-
.../spark/sql/catalyst/types/ops/TimeTypeOps.scala | 2 +
.../catalyst/types/ops/TimestampNanosTypeOps.scala | 168 ++++++++++++++++
.../spark/sql/catalyst/types/ops/TypeOps.scala | 16 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 15 +-
.../expressions/TimestampNanosRowSuite.scala | 17 +-
.../types/ops/TimestampNanosTypeOpsSuite.scala | 222 +++++++++++++++++++++
21 files changed, 613 insertions(+), 136 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index a680a04b831d..d4439631d8d0 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -8360,6 +8360,11 @@
"Temporary views cannot be created with the WITH SCHEMA clause.
Recreate the temporary view when the underlying schema changes, or use a
persisted view."
]
},
+ "TIMESTAMP_NANOS_TO_STRING" : {
+ "message" : [
+ "Converting values of the nanosecond timestamp type <dataType> to a
string."
+ ]
+ },
"TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER" : {
"message" : [
"Parsing or formatting nanosecond-precision timestamps
(TIMESTAMP_LTZ/TIMESTAMP_NTZ with precision in [7, 9]) under the LEGACY time
parser policy. Set <config> to CORRECTED."
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 705d5d8f11b1..2ad579b2cca5 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -21,8 +21,8 @@ import scala.collection.mutable
import scala.reflect.classTag
import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder,
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder,
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder,
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField,
GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder,
IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder,
LocalDateTimeNanosEncoder, LocalTimeEncoder [...]
-import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase}
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder,
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder,
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder,
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField,
GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder,
JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder,
MapEncoder, NullEncoder, RowEncoder => Agnosti [...]
+import org.apache.spark.sql.errors.DataTypeErrorsBase
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.ops.TypeApiOps
@@ -99,14 +99,6 @@ object RowEncoder extends DataTypeErrorsBase {
case TimestampType if SqlApiConf.get.datetimeJava8ApiEnabled =>
InstantEncoder(lenient)
case TimestampType => TimestampEncoder(lenient)
case TimestampNTZType => LocalDateTimeEncoder
- // Nano timestamp types intentionally do not honor `lenient`: legacy
`java.sql.Timestamp` /
- // `java.sql.Date` external types are out of scope for nanosecond
precision (SPARK-57033).
- case t: TimestampNTZNanosType =>
- DataTypeErrors.checkTimestampNanosTypesEnabled()
- LocalDateTimeNanosEncoder(t.precision)
- case t: TimestampLTZNanosType =>
- DataTypeErrors.checkTimestampNanosTypesEnabled()
- InstantNanosEncoder(t.precision)
case DateType if SqlApiConf.get.datetimeJava8ApiEnabled =>
LocalDateEncoder(lenient)
case DateType => DateEncoder(lenient)
case _: TimeType => LocalTimeEncoder
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
index 955e242d4ab5..7a4b2a56d25e 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
@@ -299,4 +299,10 @@ private[sql] object DataTypeErrors extends
DataTypeErrorsBase {
"configValue" -> "true"),
cause = null)
}
+
+ def cannotConvertNanosTimestampToStringError(dataType: DataType): Throwable
= {
+ new SparkUnsupportedOperationException(
+ errorClass = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING",
+ messageParameters = Map("dataType" -> toSQLType(dataType)))
+ }
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala
new file mode 100644
index 000000000000..aa3d52f750ef
--- /dev/null
+++
b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types.ops
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{InstantNanosEncoder,
LocalDateTimeNanosEncoder}
+import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase}
+import org.apache.spark.sql.types.{TimestampLTZNanosType,
TimestampNTZNanosType}
+
+/**
+ * Client-side (spark-api) operations shared by the nanosecond timestamp types
+ * (TimestampNTZNanosType and TimestampLTZNanosType).
+ *
+ * Internal values are [[org.apache.spark.unsafe.types.TimestampNanosVal]]
(epoch micros + nanos
+ * within the micro). The two concrete subclasses differ only in their
DataType and SQL-literal
+ * prefix; storage and formatting are identical.
+ *
+ * SCOPE (SPARK-57207): this issue wires physical representation, literals,
row accessors, and
+ * codegen class selection. Dedicated fractional-second string formatting is
not implemented yet:
+ * there is no TimestampFormatter for the nanos timestamp types. Until one
lands, format() (and
+ * the toSQLValue() that delegates to it) raises the user-facing
+ * UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING error rather than silently
truncating to
+ * microsecond precision.
+ *
+ * Dataset encoders are wired here to the precision-aware leaves added by
SPARK-57033
+ * (LocalDateTimeNanosEncoder / InstantNanosEncoder), so that turning on the
Types Framework
+ * matches the legacy RowEncoder.encoderForDataTypeDefault behavior rather
than regressing it.
+ *
+ * @since 4.3.0
+ */
+abstract class TimestampNanosTypeApiOps extends TypeApiOps with
DataTypeErrorsBase {
+
+ /** SQL literal prefix for this type, e.g. "TIMESTAMP_NTZ" or
"TIMESTAMP_LTZ". */
+ protected def sqlTypeName: String
+
+ // ==================== String Formatting ====================
+
+ // Fractional-second (nanosecond) string formatting is not implemented yet:
there is no
+ // TimestampFormatter for the nanos timestamp types. Until one lands,
formatting (CAST to STRING,
+ // EXPLAIN / SHOW output, and SQL-literal rendering via toSQLValue) raises a
user-facing
+ // unsupported-feature error rather than silently truncating to microsecond
precision.
+ override def format(v: Any): String =
+ throw DataTypeErrors.cannotConvertNanosTimestampToStringError(dataType)
+
+ override def toSQLValue(v: Any): String = s"$sqlTypeName '${format(v)}'"
+
+ // ==================== Row Encoding ====================
+
+ // Honor the spark.sql.timestampNanosTypes.enabled gate just like the legacy
+ // RowEncoder.encoderForDataTypeDefault path, so enabling the Types
Framework does not bypass
+ // the feature flag.
+ final override def getEncoder: AgnosticEncoder[_] = {
+ DataTypeErrors.checkTimestampNanosTypesEnabled()
+ nanosEncoder
+ }
+
+ /** The precision-aware encoder for this type (SPARK-57033). */
+ protected def nanosEncoder: AgnosticEncoder[_]
+}
+
+/**
+ * Client-side operations for
[[org.apache.spark.sql.types.TimestampNTZNanosType]].
+ *
+ * @param t
+ * The TimestampNTZNanosType with precision information
+ * @since 4.3.0
+ */
+class TimestampNTZNanosTypeApiOps(val t: TimestampNTZNanosType) extends
TimestampNanosTypeApiOps {
+ override def dataType: TimestampNTZNanosType = t
+ override protected def sqlTypeName: String = "TIMESTAMP_NTZ"
+
+ // Mirrors RowEncoder.encoderForDataTypeDefault for TimestampNTZNanosType
(SPARK-57033):
+ // maps to java.time.LocalDateTime with the column precision.
+ override protected def nanosEncoder: AgnosticEncoder[_] =
LocalDateTimeNanosEncoder(t.precision)
+}
+
+/**
+ * Client-side operations for
[[org.apache.spark.sql.types.TimestampLTZNanosType]].
+ *
+ * @param t
+ * The TimestampLTZNanosType with precision information
+ * @since 4.3.0
+ */
+class TimestampLTZNanosTypeApiOps(val t: TimestampLTZNanosType) extends
TimestampNanosTypeApiOps {
+ override def dataType: TimestampLTZNanosType = t
+ override protected def sqlTypeName: String = "TIMESTAMP_LTZ"
+
+ // Mirrors RowEncoder.encoderForDataTypeDefault for TimestampLTZNanosType
(SPARK-57033):
+ // maps to java.time.Instant with the column precision.
+ override protected def nanosEncoder: AgnosticEncoder[_] =
InstantNanosEncoder(t.precision)
+}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala
index fff5b8b6a022..ae690119f148 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala
@@ -21,7 +21,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.internal.SqlApiConf
-import org.apache.spark.sql.types.{DataType, TimeType}
+import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType,
TimestampNTZNanosType, TimeType}
import org.apache.spark.unsafe.types.UTF8String
/**
@@ -159,6 +159,8 @@ object TypeApiOps {
if (!SqlApiConf.get.typesFrameworkEnabled) return None
dt match {
case tt: TimeType => Some(new TimeTypeApiOps(tt))
+ case t: TimestampNTZNanosType => Some(new TimestampNTZNanosTypeApiOps(t))
+ case t: TimestampLTZNanosType => Some(new TimestampLTZNanosTypeApiOps(t))
// Add new types here - single registration point
case _ => None
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 1fad04f6917d..3924337a65bd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
import org.apache.spark.sql.types.YearMonthIntervalType._
-import org.apache.spark.unsafe.types.{BinaryView, TimestampNanosVal,
UTF8String}
+import org.apache.spark.unsafe.types.{BinaryView, UTF8String}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.Utils
@@ -88,8 +88,6 @@ object CatalystTypeConverters {
case TimestampType if SQLConf.get.datetimeJava8ApiEnabled =>
InstantConverter
case TimestampType => TimestampConverter
case TimestampNTZType => TimestampNTZConverter
- case t: TimestampNTZNanosType => new TimestampNTZNanosConverter(t)
- case t: TimestampLTZNanosType => new TimestampLTZNanosConverter(t)
case dt: DecimalType => new DecimalConverter(dt)
case BooleanType => BooleanConverter
case ByteType => ByteConverter
@@ -517,50 +515,6 @@ object CatalystTypeConverters {
DateTimeUtils.microsToLocalDateTime(row.getLong(column))
}
- private class TimestampNTZNanosConverter(dataType: TimestampNTZNanosType)
- extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] {
- override def toCatalystImpl(scalaValue: Any): TimestampNanosVal =
scalaValue match {
- case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l,
dataType.precision)
- case other => throw new SparkIllegalArgumentException(
- errorClass = "INVALID_EXTERNAL_VALUE",
- messageParameters = scala.collection.immutable.Map(
- "other" -> other.toString,
- "otherClass" -> other.getClass.getCanonicalName,
- "dataType" -> dataType.sql))
- }
-
- override def toScala(catalystValue: TimestampNanosVal): LocalDateTime =
- if (catalystValue == null) null
- else DateTimeUtils.timestampNanosToLocalDateTime(catalystValue)
-
- override def toScalaImpl(row: InternalRow, column: Int): LocalDateTime =
-
DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column))
- }
-
- // Always maps `TimestampLTZNanosType` to `java.time.Instant`. Unlike micro
`TimestampType`,
- // the mapping does not consult `spark.sql.datetime.java8API.enabled`: the
nanos LTZ type is
- // post-Java-8 and the legacy `java.sql.Timestamp` external type is
intentionally out of scope
- // here. See SPARK-57033.
- private class TimestampLTZNanosConverter(dataType: TimestampLTZNanosType)
- extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] {
- override def toCatalystImpl(scalaValue: Any): TimestampNanosVal =
scalaValue match {
- case i: Instant => DateTimeUtils.instantToTimestampNanos(i,
dataType.precision)
- case other => throw new SparkIllegalArgumentException(
- errorClass = "INVALID_EXTERNAL_VALUE",
- messageParameters = scala.collection.immutable.Map(
- "other" -> other.toString,
- "otherClass" -> other.getClass.getCanonicalName,
- "dataType" -> dataType.sql))
- }
-
- override def toScala(catalystValue: TimestampNanosVal): Instant =
- if (catalystValue == null) null
- else DateTimeUtils.timestampNanosToInstant(catalystValue)
-
- override def toScalaImpl(row: InternalRow, column: Int): Instant =
- DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column))
- }
-
private class DecimalConverter(dataType: DecimalType)
extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 17ba4fe4203b..1fa7803006c4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.{expressions => exprs}
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal,
UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder,
AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec,
KryoSerializationCodec}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder,
GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder,
IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder,
JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder,
LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder,
PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, P [...]
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder,
GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder,
JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder,
LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder,
OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder,
PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, Primi [...]
import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{dataTypeForClass,
externalDataTypeFor, isNativeEncoder}
import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
IsNull, Literal, MapKeys, MapValues, UpCast}
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull,
CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke,
NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap,
UnresolvedMapObjects, WrapOption}
@@ -176,24 +176,6 @@ object DeserializerBuildHelper {
returnNullable = false)
}
- def createDeserializerForLocalDateTimeNanos(path: Expression): Expression = {
- StaticInvoke(
- DateTimeUtils.getClass,
- ObjectType(classOf[java.time.LocalDateTime]),
- "timestampNanosToLocalDateTime",
- path :: Nil,
- returnNullable = false)
- }
-
- def createDeserializerForInstantNanos(path: Expression): Expression = {
- StaticInvoke(
- DateTimeUtils.getClass,
- ObjectType(classOf[java.time.Instant]),
- "timestampNanosToInstant",
- path :: Nil,
- returnNullable = false)
- }
-
def createDeserializerForLocalTime(path: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
@@ -311,11 +293,19 @@ object DeserializerBuildHelper {
enc: AgnosticEncoder[_],
path: Expression,
walkedTypePath: WalkedTypePath,
- isTopLevel: Boolean = false): Expression =
+ isTopLevel: Boolean = false): Expression = enc match {
+ // OptionEncoder and TransformingEncoder proxy `dataType` to the wrapped
encoder, so the
+ // framework dispatch below (which keys off `enc.dataType`) would
otherwise fire on the wrapper
+ // and skip WrapOption / the transforming codec. Route these wrappers
through the default path
+ // first, so the framework leaf dispatch only ever sees unwrapped leaf
encoders.
+ case _: OptionEncoder[_] | _: TransformingEncoder[_, _] =>
+ createDeserializerDefault(enc, path, walkedTypePath, isTopLevel)
// Framework dispatch runs before encoder-type checks in the default path.
Safe because
// framework types use dedicated leaf encoders, never migration shims or
native primitives.
- TypeOps(enc.dataType).flatMap(_.createDeserializer(path, walkedTypePath,
isTopLevel))
- .getOrElse(createDeserializerDefault(enc, path, walkedTypePath,
isTopLevel))
+ case _ =>
+ TypeOps(enc.dataType).flatMap(_.createDeserializer(path, walkedTypePath,
isTopLevel))
+ .getOrElse(createDeserializerDefault(enc, path, walkedTypePath,
isTopLevel))
+ }
private def createDeserializerDefault(
enc: AgnosticEncoder[_],
@@ -374,10 +364,6 @@ object DeserializerBuildHelper {
createDeserializerForInstant(path)
case LocalDateTimeEncoder =>
createDeserializerForLocalDateTime(path)
- case _: LocalDateTimeNanosEncoder =>
- createDeserializerForLocalDateTimeNanos(path)
- case _: InstantNanosEncoder =>
- createDeserializerForInstantNanos(path)
case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
throw
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
case LocalTimeEncoder =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index 638b213e1f6e..abec18230973 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -202,10 +202,6 @@ object InternalRow {
case DoubleType => (input, v) => input.setDouble(ordinal,
v.asInstanceOf[Double])
case CalendarIntervalType =>
(input, v) => input.setInterval(ordinal,
v.asInstanceOf[CalendarInterval])
- case _: TimestampNTZNanosType =>
- (input, v) => input.setTimestampNTZNanos(ordinal,
v.asInstanceOf[TimestampNanosVal])
- case _: TimestampLTZNanosType =>
- (input, v) => input.setTimestampLTZNanos(ordinal,
v.asInstanceOf[TimestampNanosVal])
case DecimalType.Fixed(precision, _) =>
(input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal],
precision)
case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
index 72337a4b2185..00a0d6ca2505 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
@@ -22,7 +22,7 @@ import scala.language.existentials
import org.apache.spark.sql.catalyst.{expressions => exprs}
import
org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder,
AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec,
KryoSerializationCodec}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder,
BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder,
CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder,
GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder,
JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder,
LocalDateEncoder, LocalDateTimeEncoder, LocalDateTim [...]
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder,
BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder,
CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder,
GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder,
JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder,
LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, Opt [...]
import
org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor,
isNativeEncoder, lenientExternalDataTypeFor}
import org.apache.spark.sql.catalyst.expressions.{BoundReference,
CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal,
UnsafeArrayData}
import org.apache.spark.sql.catalyst.expressions.objects._
@@ -166,28 +166,6 @@ object SerializerBuildHelper {
returnNullable = false)
}
- def createSerializerForLocalDateTimeNanos(
- inputObject: Expression,
- precision: Int): Expression = {
- StaticInvoke(
- DateTimeUtils.getClass,
- TimestampNTZNanosType(precision),
- "localDateTimeToTimestampNanos",
- inputObject :: Literal(precision) :: Nil,
- returnNullable = false)
- }
-
- def createSerializerForInstantNanos(
- inputObject: Expression,
- precision: Int): Expression = {
- StaticInvoke(
- DateTimeUtils.getClass,
- TimestampLTZNanosType(precision),
- "instantToTimestampNanos",
- inputObject :: Literal(precision) :: Nil,
- returnNullable = false)
- }
-
def createSerializerForJavaLocalDate(inputObject: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
@@ -355,12 +333,20 @@ object SerializerBuildHelper {
* representation. The mapping between the external and internal
representations is described
* by encoder `enc`.
*/
- private def createSerializer(enc: AgnosticEncoder[_], input: Expression):
Expression =
+ private def createSerializer(enc: AgnosticEncoder[_], input: Expression):
Expression = enc match {
+ // OptionEncoder and TransformingEncoder proxy `dataType` to the wrapped
encoder, so the
+ // framework dispatch below (which keys off `enc.dataType`) would
otherwise fire on the wrapper
+ // and skip Option unwrapping / the transforming codec. Route these
wrappers through the default
+ // path first, so the framework leaf dispatch only ever sees unwrapped
leaf encoders.
+ case _: OptionEncoder[_] | _: TransformingEncoder[_, _] =>
+ createSerializerDefault(enc, input)
// Framework dispatch runs before encoder-type checks
(AgnosticExpressionPathEncoder,
// isNativeEncoder) in the default path. This is safe because framework
types use dedicated
// leaf encoders (e.g., LocalTimeEncoder), never migration shims or native
primitives.
- TypeOps(enc.dataType).flatMap(_.createSerializer(input))
- .getOrElse(createSerializerDefault(enc, input))
+ case _ =>
+ TypeOps(enc.dataType).flatMap(_.createSerializer(input))
+ .getOrElse(createSerializerDefault(enc, input))
+ }
private def createSerializerDefault(
enc: AgnosticEncoder[_], input: Expression): Expression = enc match {
@@ -398,8 +384,6 @@ object SerializerBuildHelper {
case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
case InstantEncoder(false) => createSerializerForJavaInstant(input)
case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
- case LocalDateTimeNanosEncoder(p) =>
createSerializerForLocalDateTimeNanos(input, p)
- case InstantNanosEncoder(p) => createSerializerForInstantNanos(input, p)
case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
throw
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
case LocalTimeEncoder => createSerializerForLocalTime(input)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
index c507af890047..a4acc6e20d70 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType,
PhysicalIntegerType, PhysicalLongType}
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType,
Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType,
IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType,
TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType,
TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
-import org.apache.spark.unsafe.types.{BinaryView, CalendarInterval,
TimestampNanosVal, UTF8String, VariantVal}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType,
Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType,
IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType,
TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType,
YearMonthIntervalType}
+import org.apache.spark.unsafe.types.{BinaryView, CalendarInterval,
UTF8String, VariantVal}
/**
* :: DeveloperApi ::
@@ -107,8 +107,6 @@ object EncoderUtils {
case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
case _: YearMonthIntervalType =>
classOf[PhysicalIntegerType.InternalType]
case _: TimeType => classOf[PhysicalLongType.InternalType]
- case _: TimestampNTZNanosType => classOf[TimestampNanosVal]
- case _: TimestampLTZNanosType => classOf[TimestampNanosVal]
case _: StringType => classOf[UTF8String]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
@@ -119,16 +117,16 @@ object EncoderUtils {
}
}
- @scala.annotation.tailrec
- def javaBoxedType(dt: DataType): Class[_] = dt match {
+ def javaBoxedType(dt: DataType): Class[_] =
+ TypeOps(dt).map(_.getBoxedJavaClass).getOrElse(javaBoxedTypeDefault(dt))
+
+ private def javaBoxedTypeDefault(dt: DataType): Class[_] = dt match {
case _: DecimalType => classOf[Decimal]
case _: DayTimeIntervalType => classOf[java.lang.Long]
case _: YearMonthIntervalType => classOf[java.lang.Integer]
case BinaryType => classOf[Array[Byte]]
case _: StringType => classOf[UTF8String]
case CalendarIntervalType => classOf[CalendarInterval]
- case _: TimestampNTZNanosType => classOf[TimestampNanosVal]
- case _: TimestampLTZNanosType => classOf[TimestampNanosVal]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
case _: MapType => classOf[MapData]
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
index a5b8d0857c99..88c3e181e645 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.TimestampNanosVal
/**
* A parent class for mutable container objects that are reused when the
values are changed,
@@ -186,6 +187,21 @@ final class MutableAny extends MutableValue {
}
}
+final class MutableTimestampNanos extends MutableValue {
+ var value: TimestampNanosVal = _
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = {
+ isNull = false
+ value = v.asInstanceOf[TimestampNanosVal]
+ }
+ override def copy(): MutableTimestampNanos = {
+ val newCopy = new MutableTimestampNanos
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
/**
* A row type that holds an array specialized container objects, of type
[[MutableValue]], chosen
* based on the dataTypes of each column. The intent is to decrease garbage
when modifying the
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala
index 04052dafb61a..1f157d6ac18a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, CharVarcharCodegenUtils,
DateFormatter, FractionTimeFormatter, IntervalStringStyles, IntervalUtils,
MapData, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
+import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle
import org.apache.spark.sql.types._
@@ -305,6 +306,13 @@ trait ToStringBase { self: UnaryExpression with
TimeZoneAwareExpression =>
(c, evPrim) => code"$evPrim =
UTF8String.fromString($c.toPlainString());"
case _: StringType =>
(c, evPrim) => code"$evPrim = $c;"
+ // Fractional-second (nanosecond) timestamp formatting is not
implemented yet: there is no
+ // TimestampFormatter for the nanos timestamp types. The interpreted
path raises this via the
+ // Types Framework (castToString -> TypeApiOps.format); the codegen path
has no framework
+ // hook, so it raises the same user-facing error directly until a
formatter lands
+ // (SPARK-57207).
+ case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
+ throw DataTypeErrors.cannotConvertNanosTimestampToStringError(from)
case _ =>
(c, evPrim) => code"$evPrim =
UTF8String.fromString(String.valueOf($c));"
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 43f7b041d53f..6f8eec249662 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -2031,8 +2031,6 @@ object CodeGenerator extends Logging {
case _: GeographyType | _: GeometryType => classOf[BinaryView]
case _: StringType => classOf[UTF8String]
case CalendarIntervalType => classOf[CalendarInterval]
- case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
- classOf[org.apache.spark.unsafe.types.TimestampNanosVal]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
case _: MapType => classOf[MapData]
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 8c2a34240397..df703008a05b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -204,8 +204,6 @@ object Literal {
case DateType => create(0, DateType)
case TimestampType => create(0L, TimestampType)
case TimestampNTZType => create(0L, TimestampNTZType)
- case t: TimestampNTZNanosType => create(TimestampNanosVal.ZERO, t)
- case t: TimestampLTZNanosType => create(TimestampNanosVal.ZERO, t)
case t: TimeType => create(0L, t)
case it: DayTimeIntervalType => create(0L, it)
case it: YearMonthIntervalType => create(0, it)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
index 020958f34dee..f682d30235f1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending,
BoundReference, Int
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory,
MapData, SQLOrderingUtil}
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType,
DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType,
DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType,
GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType,
LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric,
ShortType, StringType, StructField, StructT [...]
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType,
DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType,
DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType,
GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType,
LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric,
ShortType, StringType, StructField, StructT [...]
import org.apache.spark.unsafe.types.{BinaryView, ByteArray,
TimestampNanosVal, UTF8String, VariantVal}
import org.apache.spark.util.ArrayImplicits._
@@ -55,8 +55,6 @@ object PhysicalDataType {
case TimestampType => PhysicalLongType
case TimestampNTZType => PhysicalLongType
case CalendarIntervalType => PhysicalCalendarIntervalType
- case _: TimestampNTZNanosType => PhysicalTimestampNTZNanosType
- case _: TimestampLTZNanosType => PhysicalTimestampLTZNanosType
case DayTimeIntervalType(_, _) => PhysicalLongType
case YearMonthIntervalType(_, _) => PhysicalIntegerType
case DateType => PhysicalIntegerType
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala
index 0cf152079c52..217a69ba4f9d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala
@@ -62,6 +62,8 @@ case class TimeTypeOps(override val t: TimeType) extends
TimeTypeApiOps(t) with
override def getJavaClass: Class[_] = classOf[Long]
+ override def getBoxedJavaClass: Class[_] = classOf[java.lang.Long]
+
override def getMutableValue: MutableValue = new MutableLong
override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit =
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala
new file mode 100644
index 000000000000..9957367ed039
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.types.ops
+
+import java.time.{Instant, LocalDateTime}
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal,
MutableTimestampNanos, MutableValue}
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
+import org.apache.spark.sql.catalyst.types.{PhysicalDataType,
PhysicalTimestampLTZNanosType, PhysicalTimestampNTZNanosType}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{ObjectType, TimestampLTZNanosType,
TimestampNTZNanosType}
+import org.apache.spark.sql.types.ops.{TimestampLTZNanosTypeApiOps,
TimestampNTZNanosTypeApiOps}
+import org.apache.spark.unsafe.types.TimestampNanosVal
+
+/**
+ * Server-side (catalyst) operations shared by the nanosecond timestamp types
+ * (TimestampNTZNanosType and TimestampLTZNanosType).
+ *
+ * Internal values are [[TimestampNanosVal]] (epoch micros + nanos within the
micro), stored in
+ * [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] via a 16-byte
variable-length payload;
+ * see [[org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues]].
+ *
+ * The Types Framework is the sole integration path for the nanosecond
timestamp types: physical
+ * representation, literals, row accessors, codegen class selection, external
java.time conversion,
+ * and serializer/deserializer expression building all flow through these Ops.
When the framework is
+ * disabled the types are unsupported (the legacy fallbacks no longer
special-case them). External
+ * java.time conversion uses the helpers added by SPARK-57033:
TimestampNTZNanosType maps to
+ * java.time.LocalDateTime and TimestampLTZNanosType to java.time.Instant,
with sub-micro digits
+ * truncated to the column precision. Concrete subclasses supply the
NTZ/LTZ-specific physical type,
+ * row accessors, conversions, and serializer/deserializer expressions.
+ *
+ * @since 4.3.0
+ */
+trait TimestampNanosTypeOps extends TypeOps {
+
+ // ==================== Physical Type Representation ====================
+
+ override def getJavaClass: Class[_] = classOf[TimestampNanosVal]
+
+ override def getMutableValue: MutableValue = new MutableTimestampNanos
+
+ // ==================== Literal Creation ====================
+
+ override def getDefaultLiteral: Literal =
Literal.create(TimestampNanosVal.ZERO, dataType)
+
+ override def getJavaLiteral(v: Any): String = {
+ val tn = v.asInstanceOf[TimestampNanosVal]
+ "org.apache.spark.unsafe.types.TimestampNanosVal.fromParts(" +
+ s"${tn.epochMicros}L, (short) ${tn.nanosWithinMicro})"
+ }
+
+ // ==================== External Type Conversion ====================
+
+ // Raises the same error as the legacy CatalystTypeConverters (SPARK-57033)
when an external
+ // value of an unexpected type is supplied for a nanosecond timestamp column.
+ protected def invalidExternalValue(other: Any): Nothing =
+ throw new SparkIllegalArgumentException(
+ errorClass = "INVALID_EXTERNAL_VALUE",
+ messageParameters = Map(
+ "other" -> other.toString,
+ "otherClass" -> other.getClass.getCanonicalName,
+ "dataType" -> dataType.sql))
+}
+
+/**
+ * Server-side operations for [[TimestampNTZNanosType]].
+ *
+ * @param t
+ * The TimestampNTZNanosType with precision information
+ * @since 4.3.0
+ */
+case class TimestampNTZNanosTypeOps(override val t: TimestampNTZNanosType)
+ extends TimestampNTZNanosTypeApiOps(t) with TimestampNanosTypeOps {
+
+ override def getPhysicalType: PhysicalDataType =
PhysicalTimestampNTZNanosType
+
+ override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit =
+ (input, v) => input.setTimestampNTZNanos(ordinal,
v.asInstanceOf[TimestampNanosVal])
+
+ override def toCatalystImpl(scalaValue: Any): Any = scalaValue match {
+ case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l,
t.precision)
+ case other => invalidExternalValue(other)
+ }
+
+ override def toScala(catalystValue: Any): Any =
+ if (catalystValue == null) null
+ else
DateTimeUtils.timestampNanosToLocalDateTime(catalystValue.asInstanceOf[TimestampNanosVal])
+
+ override def toScalaImpl(row: InternalRow, column: Int): Any =
+
DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column))
+
+ override def createSerializer(input: Expression): Option[Expression] =
+ Some(StaticInvoke(
+ DateTimeUtils.getClass,
+ t,
+ "localDateTimeToTimestampNanos",
+ input :: Literal(t.precision) :: Nil,
+ returnNullable = false))
+
+ override def createDeserializer(path: Expression): Option[Expression] =
+ Some(StaticInvoke(
+ DateTimeUtils.getClass,
+ ObjectType(classOf[LocalDateTime]),
+ "timestampNanosToLocalDateTime",
+ path :: Nil,
+ returnNullable = false))
+}
+
+/**
+ * Server-side operations for [[TimestampLTZNanosType]].
+ *
+ * @param t
+ * The TimestampLTZNanosType with precision information
+ * @since 4.3.0
+ */
+case class TimestampLTZNanosTypeOps(override val t: TimestampLTZNanosType)
+ extends TimestampLTZNanosTypeApiOps(t) with TimestampNanosTypeOps {
+
+ override def getPhysicalType: PhysicalDataType =
PhysicalTimestampLTZNanosType
+
+ override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit =
+ (input, v) => input.setTimestampLTZNanos(ordinal,
v.asInstanceOf[TimestampNanosVal])
+
+ override def toCatalystImpl(scalaValue: Any): Any = scalaValue match {
+ case i: Instant => DateTimeUtils.instantToTimestampNanos(i, t.precision)
+ case other => invalidExternalValue(other)
+ }
+
+ override def toScala(catalystValue: Any): Any =
+ if (catalystValue == null) null
+ else
DateTimeUtils.timestampNanosToInstant(catalystValue.asInstanceOf[TimestampNanosVal])
+
+ override def toScalaImpl(row: InternalRow, column: Int): Any =
+ DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column))
+
+ override def createSerializer(input: Expression): Option[Expression] =
+ Some(StaticInvoke(
+ DateTimeUtils.getClass,
+ t,
+ "instantToTimestampNanos",
+ input :: Literal(t.precision) :: Nil,
+ returnNullable = false))
+
+ override def createDeserializer(path: Expression): Option[Expression] =
+ Some(StaticInvoke(
+ DateTimeUtils.getClass,
+ ObjectType(classOf[Instant]),
+ "timestampNanosToInstant",
+ path :: Nil,
+ returnNullable = false))
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala
index 7240f0533aa3..2313c22f1f51 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression,
Literal, MutableVa
import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.execution.arrow.ArrowFieldWriter
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, TimeType}
+import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType,
TimestampNTZNanosType, TimeType}
/**
* Server-side (catalyst) type operations for the Types Framework.
@@ -82,6 +82,18 @@ trait TypeOps extends Serializable {
*/
def getJavaClass: Class[_]
+ /**
+ * Returns the boxed Java class used in code generation where a nullable,
reference-typed value
+ * is required (e.g., array/map elements, external-value validation).
Defaults to
+ * [[getJavaClass]], which is correct for object-backed types;
primitive-backed types must
+ * override it to return the corresponding boxed class (e.g.,
classOf[java.lang.Long] for a
+ * Long-backed type).
+ *
+ * @return
+ * boxed Java class
+ */
+ def getBoxedJavaClass: Class[_] = getJavaClass
+
/**
* Returns a MutableValue instance for use in SpecificInternalRow.
*
@@ -231,6 +243,8 @@ object TypeOps {
if (!SQLConf.get.typesFrameworkEnabled) return None
dt match {
case tt: TimeType => Some(TimeTypeOps(tt))
+ case t: TimestampNTZNanosType => Some(TimestampNTZNanosTypeOps(t))
+ case t: TimestampLTZNanosType => Some(TimestampLTZNanosTypeOps(t))
// Add new types here - single registration point
case _ => None
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a21653a011b3..30db7bbbd59d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -637,7 +637,8 @@ object SQLConf {
val TYPES_FRAMEWORK_ENABLED =
buildConf("spark.sql.types.framework.enabled")
.internal()
- .doc("When true, use the Types Framework for supported types (currently
TimeType). " +
+ .doc("When true, use the Types Framework for supported types (currently
TimeType and the " +
+ "nanosecond timestamp types TimestampNTZNanosType and
TimestampLTZNanosType). " +
"The framework centralizes type-specific operations in Ops classes
instead of " +
"scattered pattern matching. When false, use legacy scattered
implementation.")
.version("4.2.0")
@@ -655,11 +656,19 @@ object SQLConf {
"Unparameterized TIMESTAMP, TIMESTAMP_NTZ, and TIMESTAMP_LTZ remain
microsecond " +
"types. Enabling this flag does not guarantee full SQL support: casts,
Parquet read, " +
"typed literals, and other operations may still fail until their
respective features " +
- "are implemented.")
+ "are implemented. The nanosecond timestamp types are implemented
solely through the " +
+ "Types Framework, so this flag can only be set to true when " +
+ s"${TYPES_FRAMEWORK_ENABLED.key} is also true.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
- .createWithDefault(Utils.isTesting)
+ .checkValue(
+ enabled => !enabled || SQLConf.get.typesFrameworkEnabled,
+ "REQUIREMENT",
+ _ => Map("confRequirement" ->
+ (s"'${TYPES_FRAMEWORK_ENABLED.key}' must be true to enable the
nanosecond " +
+ "timestamp types.")))
+ .createWithDefaultFunction(() => Utils.isTesting)
val EXTENDED_EXPLAIN_PROVIDERS =
buildConf("spark.sql.extendedExplainProviders")
.doc("A comma-separated list of classes that implement the" +
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
index d85350504f7f..02c967c0ffec 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.GenericArrayData
@@ -184,6 +184,21 @@ class TimestampNanosRowSuite extends SparkFunSuite with
ExpressionEvalHelper {
checkEvaluation(Literal.create(null, TimestampLTZNanosType(7)), null)
}
+ // Fractional-second formatting is not implemented yet, so CAST(nanos AS
STRING) raises the
+ // user-facing UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING error. Both the
interpreted
+ // (ToStringBase.castToString -> TypeApiOps.format) and codegen
(ToStringBase.castToStringCode)
+ // paths must fail the same way (SPARK-57207).
+ test("CAST nanos timestamp to STRING raises an unsupported-feature error in
both eval modes") {
+ Seq(
+ Literal.create(ntzValue, TimestampNTZNanosType(9)),
+ Literal.create(ltzValue, TimestampLTZNanosType(7))).foreach { lit =>
+ checkErrorInExpression[SparkUnsupportedOperationException](
+ Cast(lit, StringType),
+ condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING",
+ parameters = Map("dataType" -> ("\"" + lit.dataType.sql + "\"")))
+ }
+ }
+
testBothCodegenAndInterpreted("UnsafeRow handles extreme epoch micros for
nanos") {
val fieldTypes: Array[DataType] = Array(TimestampNTZNanosType(9))
val converter = UnsafeProjection.create(fieldTypes)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala
new file mode 100644
index 000000000000..6154dff97be5
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.types.ops
+
+import java.time.{Instant, LocalDateTime}
+
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException,
SparkUnsupportedOperationException}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder,
ExpressionEncoder}
+import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{InstantNanosEncoder,
LocalDateTimeNanosEncoder, OptionEncoder}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal,
MutableTimestampNanos, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.types.{PhysicalDataType,
PhysicalTimestampLTZNanosType, PhysicalTimestampNTZNanosType}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType,
TimestampNTZNanosType}
+import org.apache.spark.sql.types.ops.TypeApiOps
+import org.apache.spark.unsafe.types.TimestampNanosVal
+
+/**
+ * Tests for the Types Framework wiring of the nanosecond timestamp types
(SPARK-57207).
+ *
+ * Verifies that TimestampNTZNanosType and TimestampLTZNanosType route
physical representation,
+ * literals, row accessors, mutable values, codegen class selection,
conversions, and encoders
+ * through TypeOps/TypeApiOps. The Types Framework is the sole integration
path for these types, so
+ * the suite runs with spark.sql.types.framework.enabled = true (the default
under tests).
+ */
+class TimestampNanosTypeOpsSuite extends SparkFunSuite with SQLHelper {
+
+ private val precisions = Seq(7, 8, 9)
+
+ private val ntzVal = TimestampNanosVal.fromParts(1234567890123L, 42.toShort)
+ private val ltzVal = TimestampNanosVal.fromParts(-98765L, 999.toShort)
+
+ // (dataType, expected physical type, sample value) tuples covering NTZ and
LTZ for p in {7,8,9}.
+ private def ntzCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] =
+ precisions.map(p => (TimestampNTZNanosType(p),
PhysicalTimestampNTZNanosType, ntzVal))
+
+ private def ltzCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] =
+ precisions.map(p => (TimestampLTZNanosType(p),
PhysicalTimestampLTZNanosType, ltzVal))
+
+ private def allCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] =
ntzCases ++ ltzCases
+
+ private def checkPhysicalAndLiteralAndCodegen(
+ dt: DataType,
+ physical: PhysicalDataType): Unit = {
+ assert(PhysicalDataType(dt) === physical, s"physical type for $dt")
+ val default = Literal.default(dt)
+ assert(default.dataType === dt, s"default literal type for $dt")
+ assert(default.value === TimestampNanosVal.ZERO, s"default literal value
for $dt")
+ assert(CodeGenerator.javaClass(dt) === classOf[TimestampNanosVal],
s"javaClass for $dt")
+ }
+
+ private def checkRowRoundtrip(dt: DataType, value: TimestampNanosVal): Unit
= {
+ val accessor = InternalRow.getAccessor(dt)
+ val writer = InternalRow.getWriter(0, dt)
+
+ val genericRow = new GenericInternalRow(Array[Any](null, null))
+ writer(genericRow, value)
+ assert(accessor(genericRow, 0) === value, s"GenericInternalRow roundtrip
for $dt")
+ assert(accessor(new GenericInternalRow(Array[Any](null, null)), 0) ===
null)
+
+ val specificRow = new SpecificInternalRow(Seq(dt))
+ specificRow.update(0, value)
+ assert(accessor(specificRow, 0) === value, s"SpecificInternalRow roundtrip
for $dt")
+ specificRow.update(0, null)
+ assert(accessor(specificRow, 0) === null)
+ }
+
+ test("TypeOps and TypeApiOps are registered when the framework is enabled") {
+ allCases.foreach { case (dt, _, _) =>
+ assert(TypeOps(dt).isDefined, s"TypeOps should be defined for $dt")
+ assert(TypeApiOps(dt).isDefined, s"TypeApiOps should be defined for $dt")
+ }
+ }
+
+ test("physical type, default literal, and codegen class (framework
enabled)") {
+ allCases.foreach { case (dt, physical, _) =>
+ checkPhysicalAndLiteralAndCodegen(dt, physical)
+ }
+ }
+
+ test("InternalRow and SpecificInternalRow roundtrip (framework enabled)") {
+ allCases.foreach { case (dt, _, value) =>
+ checkRowRoundtrip(dt, value)
+ }
+ }
+
+ test("SpecificInternalRow uses a dedicated MutableTimestampNanos holder") {
+ allCases.foreach { case (dt, _, _) =>
+ val row = new SpecificInternalRow(Seq(dt))
+ assert(row.values(0).isInstanceOf[MutableTimestampNanos],
+ s"expected MutableTimestampNanos for $dt")
+ }
+ }
+
+ test("getEncoder returns the SPARK-57033 nanos encoder (matches the legacy
RowEncoder path)") {
+ precisions.foreach { p =>
+ assert(TypeApiOps(TimestampNTZNanosType(p)).get.getEncoder ===
LocalDateTimeNanosEncoder(p))
+ assert(TypeApiOps(TimestampLTZNanosType(p)).get.getEncoder ===
InstantNanosEncoder(p))
+ }
+ }
+
+ test("getEncoder honors the timestampNanosTypes.enabled gate") {
+ withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "false") {
+ allCases.foreach { case (dt, _, _) =>
+ val e =
intercept[org.apache.spark.SparkException](TypeApiOps(dt).get.getEncoder)
+ assert(e.getCondition === "FEATURE_NOT_ENABLED")
+ }
+ }
+ }
+
+ // A sample with sub-micro digits so precision truncation is exercised.
+ private val sampleLocalDateTime =
LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+ private val sampleInstant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+
+ private def externalValue(dt: DataType): Any = dt match {
+ case _: TimestampNTZNanosType => sampleLocalDateTime
+ case _: TimestampLTZNanosType => sampleInstant
+ }
+
+ test("CatalystTypeConverters convert java.time values (matches the legacy
converter path)") {
+ allCases.foreach { case (dt, _, _) =>
+ val external = externalValue(dt)
+ val expectedCatalyst = dt match {
+ case t: TimestampNTZNanosType =>
+ DateTimeUtils.localDateTimeToTimestampNanos(sampleLocalDateTime,
t.precision)
+ case t: TimestampLTZNanosType =>
+ DateTimeUtils.instantToTimestampNanos(sampleInstant, t.precision)
+ }
+
+ // toScala over the truncated catalyst value, i.e. what a lossless
roundtrip yields.
+ val expectedScala = dt match {
+ case _: TimestampNTZNanosType =>
+ DateTimeUtils.timestampNanosToLocalDateTime(expectedCatalyst)
+ case _: TimestampLTZNanosType =>
+ DateTimeUtils.timestampNanosToInstant(expectedCatalyst)
+ }
+
+ val catalyst =
CatalystTypeConverters.createToCatalystConverter(dt)(external)
+ assert(catalyst === expectedCatalyst, s"toCatalyst for $dt")
+ assert(catalyst.isInstanceOf[TimestampNanosVal], s"toCatalyst must not
be identity for $dt")
+
+ val scala = CatalystTypeConverters.createToScalaConverter(dt)(catalyst)
+ assert(scala === expectedScala, s"toScala roundtrip for $dt")
+ }
+ }
+
+ test("format and toSQLValue raise
UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING") {
+ allCases.foreach { case (dt, _, value) =>
+ val ops = TypeApiOps(dt).get
+ Seq[() => Any](() => ops.format(value), () =>
ops.toSQLValue(value)).foreach { call =>
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException](call()),
+ condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING",
+ parameters = Map("dataType" -> ("\"" + dt.sql + "\"")))
+ }
+ }
+ }
+
+ private def checkOptionRoundtrip[T](
+ enc: AgnosticEncoder[Option[T]],
+ values: Seq[Option[T]]): Unit = {
+ val encoder = ExpressionEncoder(enc).resolveAndBind()
+ val toRow = encoder.createSerializer()
+ val fromRow = encoder.createDeserializer()
+ values.foreach(v => assert(fromRow(toRow(v)) === v, s"roundtrip for $enc
with $v"))
+ }
+
+ test("Option-wrapped nanos encoders round-trip (wrapper unwrapped before
framework dispatch)") {
+ // Regression for the framework serde dispatch keying off enc.dataType:
OptionEncoder proxies
+ // dataType to the wrapped nanos leaf, so the wrapper must be handled
(UnwrapOption/WrapOption)
+ // before the TypeOps leaf dispatch. Precision 9 keeps the roundtrip
lossless (SPARK-57207).
+ checkOptionRoundtrip(
+ OptionEncoder(LocalDateTimeNanosEncoder(9)),
+ Seq(Some(LocalDateTime.parse("2019-02-26T16:56:00.123456789")), None))
+ checkOptionRoundtrip(
+ OptionEncoder(InstantNanosEncoder(9)),
+ Seq(Some(Instant.parse("2019-02-26T16:56:00.123456789Z")), None))
+ }
+
+ test("framework disabled leaves the nanos types unsupported (no legacy
fallback)") {
+ withSQLConf(SQLConf.TYPES_FRAMEWORK_ENABLED.key -> "false") {
+ allCases.foreach { case (dt, _, _) =>
+ assert(TypeOps(dt).isEmpty, s"TypeOps should be empty for $dt when
disabled")
+ assert(TypeApiOps(dt).isEmpty, s"TypeApiOps should be empty for $dt
when disabled")
+ }
+ }
+ }
+
+ test("enabling the nanos types requires the Types Framework to be enabled") {
+ withSQLConf(SQLConf.TYPES_FRAMEWORK_ENABLED.key -> "false") {
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ SQLConf.get.setConfString(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key,
"true")
+ },
+ condition = "INVALID_CONF_VALUE.REQUIREMENT",
+ parameters = Map(
+ "confName" -> SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key,
+ "confValue" -> "true",
+ "confRequirement" ->
+ (s"'${SQLConf.TYPES_FRAMEWORK_ENABLED.key}' must be true to enable
the nanosecond " +
+ "timestamp types.")))
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]