This is an automated email from the ASF dual-hosted git repository.

MaxGekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 49153409945c [SPARK-57207][SQL] Support nanosecond timestamp types in 
the Types Framework
49153409945c is described below

commit 49153409945c5ebe2cd8894fb2d1de10bdcf3497
Author: Maxim Gekk <[email protected]>
AuthorDate: Fri Jun 5 03:19:46 2026 +0200

    [SPARK-57207][SQL] Support nanosecond timestamp types in the Types Framework
    
    ### What changes were proposed in this pull request?
    
    This PR wires `TimestampNTZNanosType(p)` and `TimestampLTZNanosType(p)` (p 
in [7, 9]) through the Spark SQL Types Framework (SPARK-53504), so that all 
type-specific behavior for the nanosecond timestamp types is centralized behind 
`TypeOps` / `TypeApiOps`. The nanos types are now supported **only** through 
the framework: the scattered legacy dispatch for them is removed.
    
    Concretely:
    - Add `TimestampNanosTypeOps` (catalyst) with `TimestampNTZNanosTypeOps` / 
`TimestampLTZNanosTypeOps`, registered in `TypeOps.apply()`. Overrides: 
`getPhysicalType`, `getJavaClass`, `getBoxedJavaClass`, `getRowWriter`, 
`getDefaultLiteral`, `getJavaLiteral`, `getMutableValue`, `toCatalystImpl`, 
`toScala`/`toScalaImpl`, `createSerializer`, `createDeserializer`.
    - Add a `getBoxedJavaClass` hook to the `TypeOps` base (the boxed Java 
class used in codegen). The `createSerializer` / `createDeserializer` hooks 
already exist on the base trait (used by `TimeTypeOps`); the nanos ops above 
only override them.
    - Add `TimestampNanosTypeApiOps` (sql/api) with NTZ/LTZ subclasses, 
registered in `TypeApiOps.apply()`. `getEncoder` returns the SPARK-57033 leaves 
(`LocalDateTimeNanosEncoder(p)` / `InstantNanosEncoder(p)`), gated by 
`DataTypeErrors.checkTimestampNanosTypesEnabled()`.
    - Remove the nanos branches from the legacy code paths now handled by the 
framework: `SerializerBuildHelper`, `DeserializerBuildHelper`, 
`CatalystTypeConverters`, `EncoderUtils`, `CodeGenerator`, `Literal`, and 
`InternalRow`. In `SerializerBuildHelper` / `DeserializerBuildHelper`, 
`OptionEncoder` / `TransformingEncoder` are unwrapped before the framework leaf 
dispatch, since those wrapper encoders proxy `dataType` to the wrapped encoder.
    - Add `MutableTimestampNanos` to `SpecificInternalRow` to avoid the 
`MutableAny` fallback.
    - Add a `checkValue` on `spark.sql.timestampNanosTypes.enabled` requiring 
`spark.sql.types.framework.enabled=true`, so the types cannot be enabled 
outside the framework.
    
    Fractional-second string formatting is not implemented yet (no 
`TimestampFormatter` for these types). Until it lands, converting a nanos value 
to a string (CAST to STRING, EXPLAIN/SHOW output, SQL-literal rendering) raises 
the new `UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING` error rather than 
silently truncating to microseconds. Both the interpreted path 
(`TimestampNanosTypeApiOps.format`) and the codegen path 
(`ToStringBase.castToStringCode`) raise the identical error, so the two  [...]
    
    Out of scope (follow-ups): string formatting/CAST-to-string, Connect proto, 
Arrow, PySpark conversion, Parquet/ColumnVector, and physical 
ordering/compare/hash.
    
    ### Why are the changes needed?
    
    The logical nanosecond timestamp types (SPARK-56876) and the physical row 
layer (SPARK-56981) already exist, but these types were wired only through 
scattered legacy dispatch. Centralizing the type-specific operations behind 
`TypeOps`, consistent with `TimeType`, is a prerequisite for the remaining 
nanosecond timestamp work and avoids the framework-on/off behavior divergence 
that the previous per-call-site handling produced.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The nanosecond timestamp types are a preview feature gated by 
`spark.sql.timestampNanosTypes.enabled` (and 
`spark.sql.types.framework.enabled`), both off by default in production. When 
these preview flags are enabled, converting a nanos timestamp to a string 
raises `UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING` because 
fractional-second formatting is not implemented yet.
    
    ### How was this patch tested?
    
    Added/updated tests:
    - `TimestampNanosTypeOpsSuite` (catalyst): `TypeOps`/`TypeApiOps` 
registration; `PhysicalDataType`, default `Literal`, and codegen Java class; 
`InternalRow`/`SpecificInternalRow` roundtrips incl. the dedicated 
`MutableTimestampNanos` holder; `getEncoder` returns the SPARK-57033 nanos 
encoders; `CatalystTypeConverters` `java.time` roundtrip; `format`/`toSQLValue` 
raise `UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING`; framework-disabled 
leaves the types unsupported (no legacy fallback); [...]
    - `TimestampNanosTypeOpsSuite` also covers Option-wrapped nanos encoder 
roundtrips (Some/None for NTZ and LTZ), verifying wrapper encoders are 
unwrapped before the framework serde dispatch.
    - `TimestampNanosRowSuite` (catalyst): CAST nanos -> STRING raises the 
unsupported-feature error in both interpreted and codegen modes; unsafe/generic 
row roundtrips; literal validation.
    
    ```
    build/sbt 'catalyst/testOnly *TimestampNanosTypeOpsSuite 
*TimestampNanosRowSuite'
    build/sbt 'core/testOnly org.apache.spark.SparkThrowableSuite'
    ```
    
    All tests pass. `catalyst` / `sql-api` scalastyle are clean.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor
    
    Closes #56266 from MaxGekk/nanos-types-typeops.
    
    Authored-by: Maxim Gekk <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   5 +
 .../spark/sql/catalyst/encoders/RowEncoder.scala   |  12 +-
 .../apache/spark/sql/errors/DataTypeErrors.scala   |   6 +
 .../sql/types/ops/TimestampNanosTypeApiOps.scala   | 106 ++++++++++
 .../apache/spark/sql/types/ops/TypeApiOps.scala    |   4 +-
 .../sql/catalyst/CatalystTypeConverters.scala      |  48 +----
 .../sql/catalyst/DeserializerBuildHelper.scala     |  38 ++--
 .../apache/spark/sql/catalyst/InternalRow.scala    |   4 -
 .../spark/sql/catalyst/SerializerBuildHelper.scala |  40 ++--
 .../spark/sql/catalyst/encoders/EncoderUtils.scala |  14 +-
 .../catalyst/expressions/SpecificInternalRow.scala |  16 ++
 .../sql/catalyst/expressions/ToStringBase.scala    |   8 +
 .../expressions/codegen/CodeGenerator.scala        |   2 -
 .../spark/sql/catalyst/expressions/literals.scala  |   2 -
 .../sql/catalyst/types/PhysicalDataType.scala      |   4 +-
 .../spark/sql/catalyst/types/ops/TimeTypeOps.scala |   2 +
 .../catalyst/types/ops/TimestampNanosTypeOps.scala | 168 ++++++++++++++++
 .../spark/sql/catalyst/types/ops/TypeOps.scala     |  16 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  15 +-
 .../expressions/TimestampNanosRowSuite.scala       |  17 +-
 .../types/ops/TimestampNanosTypeOpsSuite.scala     | 222 +++++++++++++++++++++
 21 files changed, 613 insertions(+), 136 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 4fcca01fba44..0bb6ab09baf0 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -8365,6 +8365,11 @@
           "Temporary views cannot be created with the WITH SCHEMA clause. 
Recreate the temporary view when the underlying schema changes, or use a 
persisted view."
         ]
       },
+      "TIMESTAMP_NANOS_TO_STRING" : {
+        "message" : [
+          "Converting values of the nanosecond timestamp type <dataType> to a 
string."
+        ]
+      },
       "TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER" : {
         "message" : [
           "Parsing or formatting nanosecond-precision timestamps 
(TIMESTAMP_LTZ/TIMESTAMP_NTZ with precision in [7, 9]) under the LEGACY time 
parser policy. Set <config> to CORRECTED."
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 705d5d8f11b1..2ad579b2cca5 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -21,8 +21,8 @@ import scala.collection.mutable
 import scala.reflect.classTag
 
 import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, 
GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, 
IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, 
LocalDateTimeNanosEncoder, LocalTimeEncoder [...]
-import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase}
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, 
GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, 
JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, 
MapEncoder, NullEncoder, RowEncoder => Agnosti [...]
+import org.apache.spark.sql.errors.DataTypeErrorsBase
 import org.apache.spark.sql.internal.SqlApiConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.ops.TypeApiOps
@@ -99,14 +99,6 @@ object RowEncoder extends DataTypeErrorsBase {
       case TimestampType if SqlApiConf.get.datetimeJava8ApiEnabled => 
InstantEncoder(lenient)
       case TimestampType => TimestampEncoder(lenient)
       case TimestampNTZType => LocalDateTimeEncoder
-      // Nano timestamp types intentionally do not honor `lenient`: legacy 
`java.sql.Timestamp` /
-      // `java.sql.Date` external types are out of scope for nanosecond 
precision (SPARK-57033).
-      case t: TimestampNTZNanosType =>
-        DataTypeErrors.checkTimestampNanosTypesEnabled()
-        LocalDateTimeNanosEncoder(t.precision)
-      case t: TimestampLTZNanosType =>
-        DataTypeErrors.checkTimestampNanosTypesEnabled()
-        InstantNanosEncoder(t.precision)
       case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => 
LocalDateEncoder(lenient)
       case DateType => DateEncoder(lenient)
       case _: TimeType => LocalTimeEncoder
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
index 955e242d4ab5..7a4b2a56d25e 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
@@ -299,4 +299,10 @@ private[sql] object DataTypeErrors extends 
DataTypeErrorsBase {
         "configValue" -> "true"),
       cause = null)
   }
+
+  def cannotConvertNanosTimestampToStringError(dataType: DataType): Throwable 
= {
+    new SparkUnsupportedOperationException(
+      errorClass = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING",
+      messageParameters = Map("dataType" -> toSQLType(dataType)))
+  }
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala
new file mode 100644
index 000000000000..aa3d52f750ef
--- /dev/null
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types.ops
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{InstantNanosEncoder, 
LocalDateTimeNanosEncoder}
+import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase}
+import org.apache.spark.sql.types.{TimestampLTZNanosType, 
TimestampNTZNanosType}
+
+/**
+ * Client-side (spark-api) operations shared by the nanosecond timestamp types
+ * (TimestampNTZNanosType and TimestampLTZNanosType).
+ *
+ * Internal values are [[org.apache.spark.unsafe.types.TimestampNanosVal]] 
(epoch micros + nanos
+ * within the micro). The two concrete subclasses differ only in their 
DataType and SQL-literal
+ * prefix; storage and formatting are identical.
+ *
+ * SCOPE (SPARK-57207): this issue wires physical representation, literals, 
row accessors, and
+ * codegen class selection. Dedicated fractional-second string formatting is 
not implemented yet:
+ * there is no TimestampFormatter for the nanos timestamp types. Until one 
lands, format() (and
+ * the toSQLValue() that delegates to it) raises the user-facing
+ * UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING error rather than silently 
truncating to
+ * microsecond precision.
+ *
+ * Dataset encoders are wired here to the precision-aware leaves added by 
SPARK-57033
+ * (LocalDateTimeNanosEncoder / InstantNanosEncoder), so that turning on the 
Types Framework
+ * matches the legacy RowEncoder.encoderForDataTypeDefault behavior rather 
than regressing it.
+ *
+ * @since 4.3.0
+ */
+abstract class TimestampNanosTypeApiOps extends TypeApiOps with 
DataTypeErrorsBase {
+
+  /** SQL literal prefix for this type, e.g. "TIMESTAMP_NTZ" or 
"TIMESTAMP_LTZ". */
+  protected def sqlTypeName: String
+
+  // ==================== String Formatting ====================
+
+  // Fractional-second (nanosecond) string formatting is not implemented yet: 
there is no
+  // TimestampFormatter for the nanos timestamp types. Until one lands, 
formatting (CAST to STRING,
+  // EXPLAIN / SHOW output, and SQL-literal rendering via toSQLValue) raises a 
user-facing
+  // unsupported-feature error rather than silently truncating to microsecond 
precision.
+  override def format(v: Any): String =
+    throw DataTypeErrors.cannotConvertNanosTimestampToStringError(dataType)
+
+  override def toSQLValue(v: Any): String = s"$sqlTypeName '${format(v)}'"
+
+  // ==================== Row Encoding ====================
+
+  // Honor the spark.sql.timestampNanosTypes.enabled gate just like the legacy
+  // RowEncoder.encoderForDataTypeDefault path, so enabling the Types 
Framework does not bypass
+  // the feature flag.
+  final override def getEncoder: AgnosticEncoder[_] = {
+    DataTypeErrors.checkTimestampNanosTypesEnabled()
+    nanosEncoder
+  }
+
+  /** The precision-aware encoder for this type (SPARK-57033). */
+  protected def nanosEncoder: AgnosticEncoder[_]
+}
+
+/**
+ * Client-side operations for 
[[org.apache.spark.sql.types.TimestampNTZNanosType]].
+ *
+ * @param t
+ *   The TimestampNTZNanosType with precision information
+ * @since 4.3.0
+ */
+class TimestampNTZNanosTypeApiOps(val t: TimestampNTZNanosType) extends 
TimestampNanosTypeApiOps {
+  override def dataType: TimestampNTZNanosType = t
+  override protected def sqlTypeName: String = "TIMESTAMP_NTZ"
+
+  // Mirrors RowEncoder.encoderForDataTypeDefault for TimestampNTZNanosType 
(SPARK-57033):
+  // maps to java.time.LocalDateTime with the column precision.
+  override protected def nanosEncoder: AgnosticEncoder[_] = 
LocalDateTimeNanosEncoder(t.precision)
+}
+
+/**
+ * Client-side operations for 
[[org.apache.spark.sql.types.TimestampLTZNanosType]].
+ *
+ * @param t
+ *   The TimestampLTZNanosType with precision information
+ * @since 4.3.0
+ */
+class TimestampLTZNanosTypeApiOps(val t: TimestampLTZNanosType) extends 
TimestampNanosTypeApiOps {
+  override def dataType: TimestampLTZNanosType = t
+  override protected def sqlTypeName: String = "TIMESTAMP_LTZ"
+
+  // Mirrors RowEncoder.encoderForDataTypeDefault for TimestampLTZNanosType 
(SPARK-57033):
+  // maps to java.time.Instant with the column precision.
+  override protected def nanosEncoder: AgnosticEncoder[_] = 
InstantNanosEncoder(t.precision)
+}
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala
index fff5b8b6a022..ae690119f148 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala
@@ -21,7 +21,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType
 
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
 import org.apache.spark.sql.internal.SqlApiConf
-import org.apache.spark.sql.types.{DataType, TimeType}
+import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType, 
TimestampNTZNanosType, TimeType}
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
@@ -159,6 +159,8 @@ object TypeApiOps {
     if (!SqlApiConf.get.typesFrameworkEnabled) return None
     dt match {
       case tt: TimeType => Some(new TimeTypeApiOps(tt))
+      case t: TimestampNTZNanosType => Some(new TimestampNTZNanosTypeApiOps(t))
+      case t: TimestampLTZNanosType => Some(new TimestampLTZNanosTypeApiOps(t))
       // Add new types here - single registration point
       case _ => None
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 1fad04f6917d..3924337a65bd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType._
 import org.apache.spark.sql.types.YearMonthIntervalType._
-import org.apache.spark.unsafe.types.{BinaryView, TimestampNanosVal, 
UTF8String}
+import org.apache.spark.unsafe.types.{BinaryView, UTF8String}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.collection.Utils
 
@@ -88,8 +88,6 @@ object CatalystTypeConverters {
       case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => 
InstantConverter
       case TimestampType => TimestampConverter
       case TimestampNTZType => TimestampNTZConverter
-      case t: TimestampNTZNanosType => new TimestampNTZNanosConverter(t)
-      case t: TimestampLTZNanosType => new TimestampLTZNanosConverter(t)
       case dt: DecimalType => new DecimalConverter(dt)
       case BooleanType => BooleanConverter
       case ByteType => ByteConverter
@@ -517,50 +515,6 @@ object CatalystTypeConverters {
       DateTimeUtils.microsToLocalDateTime(row.getLong(column))
   }
 
-  private class TimestampNTZNanosConverter(dataType: TimestampNTZNanosType)
-    extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] {
-    override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = 
scalaValue match {
-      case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l, 
dataType.precision)
-      case other => throw new SparkIllegalArgumentException(
-        errorClass = "INVALID_EXTERNAL_VALUE",
-        messageParameters = scala.collection.immutable.Map(
-          "other" -> other.toString,
-          "otherClass" -> other.getClass.getCanonicalName,
-          "dataType" -> dataType.sql))
-    }
-
-    override def toScala(catalystValue: TimestampNanosVal): LocalDateTime =
-      if (catalystValue == null) null
-      else DateTimeUtils.timestampNanosToLocalDateTime(catalystValue)
-
-    override def toScalaImpl(row: InternalRow, column: Int): LocalDateTime =
-      
DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column))
-  }
-
-  // Always maps `TimestampLTZNanosType` to `java.time.Instant`. Unlike micro 
`TimestampType`,
-  // the mapping does not consult `spark.sql.datetime.java8API.enabled`: the 
nanos LTZ type is
-  // post-Java-8 and the legacy `java.sql.Timestamp` external type is 
intentionally out of scope
-  // here. See SPARK-57033.
-  private class TimestampLTZNanosConverter(dataType: TimestampLTZNanosType)
-    extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] {
-    override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = 
scalaValue match {
-      case i: Instant => DateTimeUtils.instantToTimestampNanos(i, 
dataType.precision)
-      case other => throw new SparkIllegalArgumentException(
-        errorClass = "INVALID_EXTERNAL_VALUE",
-        messageParameters = scala.collection.immutable.Map(
-          "other" -> other.toString,
-          "otherClass" -> other.getClass.getCanonicalName,
-          "dataType" -> dataType.sql))
-    }
-
-    override def toScala(catalystValue: TimestampNanosVal): Instant =
-      if (catalystValue == null) null
-      else DateTimeUtils.timestampNanosToInstant(catalystValue)
-
-    override def toScalaImpl(row: InternalRow, column: Int): Instant =
-      DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column))
-  }
-
   private class DecimalConverter(dataType: DecimalType)
     extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 17ba4fe4203b..1fa7803006c4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.{expressions => exprs}
 import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedExtractValue}
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, 
AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, 
KryoSerializationCodec}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, 
GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, 
IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, 
JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, 
LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, 
PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, P [...]
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, 
GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, 
JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, 
LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, 
OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, 
PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, Primi [...]
 import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{dataTypeForClass, 
externalDataTypeFor, isNativeEncoder}
 import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, 
IsNull, Literal, MapKeys, MapValues, UpCast}
 import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, 
CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke, 
NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap, 
UnresolvedMapObjects, WrapOption}
@@ -176,24 +176,6 @@ object DeserializerBuildHelper {
       returnNullable = false)
   }
 
-  def createDeserializerForLocalDateTimeNanos(path: Expression): Expression = {
-    StaticInvoke(
-      DateTimeUtils.getClass,
-      ObjectType(classOf[java.time.LocalDateTime]),
-      "timestampNanosToLocalDateTime",
-      path :: Nil,
-      returnNullable = false)
-  }
-
-  def createDeserializerForInstantNanos(path: Expression): Expression = {
-    StaticInvoke(
-      DateTimeUtils.getClass,
-      ObjectType(classOf[java.time.Instant]),
-      "timestampNanosToInstant",
-      path :: Nil,
-      returnNullable = false)
-  }
-
   def createDeserializerForLocalTime(path: Expression): Expression = {
     StaticInvoke(
       DateTimeUtils.getClass,
@@ -311,11 +293,19 @@ object DeserializerBuildHelper {
       enc: AgnosticEncoder[_],
       path: Expression,
       walkedTypePath: WalkedTypePath,
-      isTopLevel: Boolean = false): Expression =
+      isTopLevel: Boolean = false): Expression = enc match {
+    // OptionEncoder and TransformingEncoder proxy `dataType` to the wrapped 
encoder, so the
+    // framework dispatch below (which keys off `enc.dataType`) would 
otherwise fire on the wrapper
+    // and skip WrapOption / the transforming codec. Route these wrappers 
through the default path
+    // first, so the framework leaf dispatch only ever sees unwrapped leaf 
encoders.
+    case _: OptionEncoder[_] | _: TransformingEncoder[_, _] =>
+      createDeserializerDefault(enc, path, walkedTypePath, isTopLevel)
     // Framework dispatch runs before encoder-type checks in the default path. 
Safe because
     // framework types use dedicated leaf encoders, never migration shims or 
native primitives.
-    TypeOps(enc.dataType).flatMap(_.createDeserializer(path, walkedTypePath, 
isTopLevel))
-      .getOrElse(createDeserializerDefault(enc, path, walkedTypePath, 
isTopLevel))
+    case _ =>
+      TypeOps(enc.dataType).flatMap(_.createDeserializer(path, walkedTypePath, 
isTopLevel))
+        .getOrElse(createDeserializerDefault(enc, path, walkedTypePath, 
isTopLevel))
+  }
 
   private def createDeserializerDefault(
       enc: AgnosticEncoder[_],
@@ -374,10 +364,6 @@ object DeserializerBuildHelper {
       createDeserializerForInstant(path)
     case LocalDateTimeEncoder =>
       createDeserializerForLocalDateTime(path)
-    case _: LocalDateTimeNanosEncoder =>
-      createDeserializerForLocalDateTimeNanos(path)
-    case _: InstantNanosEncoder =>
-      createDeserializerForInstantNanos(path)
     case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
       throw 
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
     case LocalTimeEncoder =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index 638b213e1f6e..abec18230973 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -202,10 +202,6 @@ object InternalRow {
     case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
     case CalendarIntervalType =>
       (input, v) => input.setInterval(ordinal, 
v.asInstanceOf[CalendarInterval])
-    case _: TimestampNTZNanosType =>
-      (input, v) => input.setTimestampNTZNanos(ordinal, 
v.asInstanceOf[TimestampNanosVal])
-    case _: TimestampLTZNanosType =>
-      (input, v) => input.setTimestampLTZNanos(ordinal, 
v.asInstanceOf[TimestampNanosVal])
     case DecimalType.Fixed(precision, _) =>
       (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
     case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
index 72337a4b2185..00a0d6ca2505 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
@@ -22,7 +22,7 @@ import scala.language.existentials
 import org.apache.spark.sql.catalyst.{expressions => exprs}
 import 
org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, 
AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, 
KryoSerializationCodec}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, 
GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, 
JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, 
LocalDateEncoder, LocalDateTimeEncoder, LocalDateTim [...]
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, 
GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, 
JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, 
LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, Opt [...]
 import 
org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, 
isNativeEncoder, lenientExternalDataTypeFor}
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal, 
UnsafeArrayData}
 import org.apache.spark.sql.catalyst.expressions.objects._
@@ -166,28 +166,6 @@ object SerializerBuildHelper {
       returnNullable = false)
   }
 
-  def createSerializerForLocalDateTimeNanos(
-      inputObject: Expression,
-      precision: Int): Expression = {
-    StaticInvoke(
-      DateTimeUtils.getClass,
-      TimestampNTZNanosType(precision),
-      "localDateTimeToTimestampNanos",
-      inputObject :: Literal(precision) :: Nil,
-      returnNullable = false)
-  }
-
-  def createSerializerForInstantNanos(
-      inputObject: Expression,
-      precision: Int): Expression = {
-    StaticInvoke(
-      DateTimeUtils.getClass,
-      TimestampLTZNanosType(precision),
-      "instantToTimestampNanos",
-      inputObject :: Literal(precision) :: Nil,
-      returnNullable = false)
-  }
-
   def createSerializerForJavaLocalDate(inputObject: Expression): Expression = {
     StaticInvoke(
       DateTimeUtils.getClass,
@@ -355,12 +333,20 @@ object SerializerBuildHelper {
    * representation. The mapping between the external and internal 
representations is described
    * by encoder `enc`.
    */
-  private def createSerializer(enc: AgnosticEncoder[_], input: Expression): 
Expression =
+  private def createSerializer(enc: AgnosticEncoder[_], input: Expression): 
Expression = enc match {
+    // OptionEncoder and TransformingEncoder proxy `dataType` to the wrapped 
encoder, so the
+    // framework dispatch below (which keys off `enc.dataType`) would 
otherwise fire on the wrapper
+    // and skip Option unwrapping / the transforming codec. Route these 
wrappers through the default
+    // path first, so the framework leaf dispatch only ever sees unwrapped 
leaf encoders.
+    case _: OptionEncoder[_] | _: TransformingEncoder[_, _] =>
+      createSerializerDefault(enc, input)
     // Framework dispatch runs before encoder-type checks 
(AgnosticExpressionPathEncoder,
     // isNativeEncoder) in the default path. This is safe because framework 
types use dedicated
     // leaf encoders (e.g., LocalTimeEncoder), never migration shims or native 
primitives.
-    TypeOps(enc.dataType).flatMap(_.createSerializer(input))
-      .getOrElse(createSerializerDefault(enc, input))
+    case _ =>
+      TypeOps(enc.dataType).flatMap(_.createSerializer(input))
+        .getOrElse(createSerializerDefault(enc, input))
+  }
 
   private def createSerializerDefault(
       enc: AgnosticEncoder[_], input: Expression): Expression = enc match {
@@ -398,8 +384,6 @@ object SerializerBuildHelper {
     case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
     case InstantEncoder(false) => createSerializerForJavaInstant(input)
     case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
-    case LocalDateTimeNanosEncoder(p) => 
createSerializerForLocalDateTimeNanos(input, p)
-    case InstantNanosEncoder(p) => createSerializerForInstantNanos(input, p)
     case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
       throw 
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
     case LocalTimeEncoder => createSerializerForLocalTime(input)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
index c507af890047..a4acc6e20d70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, 
PhysicalIntegerType, PhysicalLongType}
 import org.apache.spark.sql.catalyst.types.ops.TypeOps
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, 
Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, 
IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, 
TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType, 
TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
-import org.apache.spark.unsafe.types.{BinaryView, CalendarInterval, 
TimestampNanosVal, UTF8String, VariantVal}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, 
Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, 
IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, 
TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, 
YearMonthIntervalType}
+import org.apache.spark.unsafe.types.{BinaryView, CalendarInterval, 
UTF8String, VariantVal}
 
 /**
  * :: DeveloperApi ::
@@ -107,8 +107,6 @@ object EncoderUtils {
       case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
       case _: YearMonthIntervalType => 
classOf[PhysicalIntegerType.InternalType]
       case _: TimeType => classOf[PhysicalLongType.InternalType]
-      case _: TimestampNTZNanosType => classOf[TimestampNanosVal]
-      case _: TimestampLTZNanosType => classOf[TimestampNanosVal]
       case _: StringType => classOf[UTF8String]
       case _: StructType => classOf[InternalRow]
       case _: ArrayType => classOf[ArrayData]
@@ -119,16 +117,16 @@ object EncoderUtils {
     }
   }
 
-  @scala.annotation.tailrec
-  def javaBoxedType(dt: DataType): Class[_] = dt match {
+  def javaBoxedType(dt: DataType): Class[_] =
+    TypeOps(dt).map(_.getBoxedJavaClass).getOrElse(javaBoxedTypeDefault(dt))
+
+  private def javaBoxedTypeDefault(dt: DataType): Class[_] = dt match {
     case _: DecimalType => classOf[Decimal]
     case _: DayTimeIntervalType => classOf[java.lang.Long]
     case _: YearMonthIntervalType => classOf[java.lang.Integer]
     case BinaryType => classOf[Array[Byte]]
     case _: StringType => classOf[UTF8String]
     case CalendarIntervalType => classOf[CalendarInterval]
-    case _: TimestampNTZNanosType => classOf[TimestampNanosVal]
-    case _: TimestampLTZNanosType => classOf[TimestampNanosVal]
     case _: StructType => classOf[InternalRow]
     case _: ArrayType => classOf[ArrayData]
     case _: MapType => classOf[MapData]
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
index a5b8d0857c99..88c3e181e645 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.types.ops.TypeOps
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.TimestampNanosVal
 
 /**
  * A parent class for mutable container objects that are reused when the 
values are changed,
@@ -186,6 +187,21 @@ final class MutableAny extends MutableValue {
   }
 }
 
+final class MutableTimestampNanos extends MutableValue {
+  var value: TimestampNanosVal = _
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[TimestampNanosVal]
+  }
+  override def copy(): MutableTimestampNanos = {
+    val newCopy = new MutableTimestampNanos
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
 /**
  * A row type that holds an array specialized container objects, of type 
[[MutableValue]], chosen
  * based on the dataTypes of each column.  The intent is to decrease garbage 
when modifying the
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala
index 04052dafb61a..1f157d6ac18a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.util.{ArrayData, CharVarcharCodegenUtils, 
DateFormatter, FractionTimeFormatter, IntervalStringStyles, IntervalUtils, 
MapData, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
+import org.apache.spark.sql.errors.DataTypeErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle
 import org.apache.spark.sql.types._
@@ -305,6 +306,13 @@ trait ToStringBase { self: UnaryExpression with 
TimeZoneAwareExpression =>
         (c, evPrim) => code"$evPrim = 
UTF8String.fromString($c.toPlainString());"
       case _: StringType =>
         (c, evPrim) => code"$evPrim = $c;"
+      // Fractional-second (nanosecond) timestamp formatting is not 
implemented yet: there is no
+      // TimestampFormatter for the nanos timestamp types. The interpreted 
path raises this via the
+      // Types Framework (castToString -> TypeApiOps.format); the codegen path 
has no framework
+      // hook, so it raises the same user-facing error directly until a 
formatter lands
+      // (SPARK-57207).
+      case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
+        throw DataTypeErrors.cannotConvertNanosTimestampToStringError(from)
       case _ =>
         (c, evPrim) => code"$evPrim = 
UTF8String.fromString(String.valueOf($c));"
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 43f7b041d53f..6f8eec249662 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -2031,8 +2031,6 @@ object CodeGenerator extends Logging {
     case _: GeographyType | _: GeometryType => classOf[BinaryView]
     case _: StringType => classOf[UTF8String]
     case CalendarIntervalType => classOf[CalendarInterval]
-    case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
-      classOf[org.apache.spark.unsafe.types.TimestampNanosVal]
     case _: StructType => classOf[InternalRow]
     case _: ArrayType => classOf[ArrayData]
     case _: MapType => classOf[MapData]
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 8c2a34240397..df703008a05b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -204,8 +204,6 @@ object Literal {
     case DateType => create(0, DateType)
     case TimestampType => create(0L, TimestampType)
     case TimestampNTZType => create(0L, TimestampNTZType)
-    case t: TimestampNTZNanosType => create(TimestampNanosVal.ZERO, t)
-    case t: TimestampLTZNanosType => create(TimestampNanosVal.ZERO, t)
     case t: TimeType => create(0L, t)
     case it: DayTimeIntervalType => create(0L, it)
     case it: YearMonthIntervalType => create(0, it)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
index 020958f34dee..f682d30235f1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, 
BoundReference, Int
 import org.apache.spark.sql.catalyst.types.ops.TypeOps
 import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory, 
MapData, SQLOrderingUtil}
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, 
DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, 
DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, 
GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType, 
LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, 
ShortType, StringType, StructField, StructT [...]
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, 
DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, 
DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, 
GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType, 
LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, 
ShortType, StringType, StructField, StructT [...]
 import org.apache.spark.unsafe.types.{BinaryView, ByteArray, 
TimestampNanosVal, UTF8String, VariantVal}
 import org.apache.spark.util.ArrayImplicits._
 
@@ -55,8 +55,6 @@ object PhysicalDataType {
     case TimestampType => PhysicalLongType
     case TimestampNTZType => PhysicalLongType
     case CalendarIntervalType => PhysicalCalendarIntervalType
-    case _: TimestampNTZNanosType => PhysicalTimestampNTZNanosType
-    case _: TimestampLTZNanosType => PhysicalTimestampLTZNanosType
     case DayTimeIntervalType(_, _) => PhysicalLongType
     case YearMonthIntervalType(_, _) => PhysicalIntegerType
     case DateType => PhysicalIntegerType
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala
index 0cf152079c52..217a69ba4f9d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala
@@ -62,6 +62,8 @@ case class TimeTypeOps(override val t: TimeType) extends 
TimeTypeApiOps(t) with
 
   override def getJavaClass: Class[_] = classOf[Long]
 
+  override def getBoxedJavaClass: Class[_] = classOf[java.lang.Long]
+
   override def getMutableValue: MutableValue = new MutableLong
 
   override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit =
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala
new file mode 100644
index 000000000000..9957367ed039
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.types.ops
+
+import java.time.{Instant, LocalDateTime}
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, 
MutableTimestampNanos, MutableValue}
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
+import org.apache.spark.sql.catalyst.types.{PhysicalDataType, 
PhysicalTimestampLTZNanosType, PhysicalTimestampNTZNanosType}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{ObjectType, TimestampLTZNanosType, 
TimestampNTZNanosType}
+import org.apache.spark.sql.types.ops.{TimestampLTZNanosTypeApiOps, 
TimestampNTZNanosTypeApiOps}
+import org.apache.spark.unsafe.types.TimestampNanosVal
+
+/**
+ * Server-side (catalyst) operations shared by the nanosecond timestamp types
+ * (TimestampNTZNanosType and TimestampLTZNanosType).
+ *
+ * Internal values are [[TimestampNanosVal]] (epoch micros + nanos within the 
micro), stored in
+ * [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] via a 16-byte 
variable-length payload;
+ * see [[org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues]].
+ *
+ * The Types Framework is the sole integration path for the nanosecond 
timestamp types: physical
+ * representation, literals, row accessors, codegen class selection, external 
java.time conversion,
+ * and serializer/deserializer expression building all flow through these Ops. 
When the framework is
+ * disabled the types are unsupported (the legacy fallbacks no longer 
special-case them). External
+ * java.time conversion uses the helpers added by SPARK-57033: 
TimestampNTZNanosType maps to
+ * java.time.LocalDateTime and TimestampLTZNanosType to java.time.Instant, 
with sub-micro digits
+ * truncated to the column precision. Concrete subclasses supply the 
NTZ/LTZ-specific physical type,
+ * row accessors, conversions, and serializer/deserializer expressions.
+ *
+ * @since 4.3.0
+ */
+trait TimestampNanosTypeOps extends TypeOps {
+
+  // ==================== Physical Type Representation ====================
+
+  override def getJavaClass: Class[_] = classOf[TimestampNanosVal]
+
+  override def getMutableValue: MutableValue = new MutableTimestampNanos
+
+  // ==================== Literal Creation ====================
+
+  override def getDefaultLiteral: Literal = 
Literal.create(TimestampNanosVal.ZERO, dataType)
+
+  override def getJavaLiteral(v: Any): String = {
+    val tn = v.asInstanceOf[TimestampNanosVal]
+    "org.apache.spark.unsafe.types.TimestampNanosVal.fromParts(" +
+      s"${tn.epochMicros}L, (short) ${tn.nanosWithinMicro})"
+  }
+
+  // ==================== External Type Conversion ====================
+
+  // Raises the same error as the legacy CatalystTypeConverters (SPARK-57033) 
when an external
+  // value of an unexpected type is supplied for a nanosecond timestamp column.
+  protected def invalidExternalValue(other: Any): Nothing =
+    throw new SparkIllegalArgumentException(
+      errorClass = "INVALID_EXTERNAL_VALUE",
+      messageParameters = Map(
+        "other" -> other.toString,
+        "otherClass" -> other.getClass.getCanonicalName,
+        "dataType" -> dataType.sql))
+}
+
+/**
+ * Server-side operations for [[TimestampNTZNanosType]].
+ *
+ * @param t
+ *   The TimestampNTZNanosType with precision information
+ * @since 4.3.0
+ */
+case class TimestampNTZNanosTypeOps(override val t: TimestampNTZNanosType)
+  extends TimestampNTZNanosTypeApiOps(t) with TimestampNanosTypeOps {
+
+  override def getPhysicalType: PhysicalDataType = 
PhysicalTimestampNTZNanosType
+
+  override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit =
+    (input, v) => input.setTimestampNTZNanos(ordinal, 
v.asInstanceOf[TimestampNanosVal])
+
+  override def toCatalystImpl(scalaValue: Any): Any = scalaValue match {
+    case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l, 
t.precision)
+    case other => invalidExternalValue(other)
+  }
+
+  override def toScala(catalystValue: Any): Any =
+    if (catalystValue == null) null
+    else 
DateTimeUtils.timestampNanosToLocalDateTime(catalystValue.asInstanceOf[TimestampNanosVal])
+
+  override def toScalaImpl(row: InternalRow, column: Int): Any =
+    
DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column))
+
+  override def createSerializer(input: Expression): Option[Expression] =
+    Some(StaticInvoke(
+      DateTimeUtils.getClass,
+      t,
+      "localDateTimeToTimestampNanos",
+      input :: Literal(t.precision) :: Nil,
+      returnNullable = false))
+
+  override def createDeserializer(path: Expression): Option[Expression] =
+    Some(StaticInvoke(
+      DateTimeUtils.getClass,
+      ObjectType(classOf[LocalDateTime]),
+      "timestampNanosToLocalDateTime",
+      path :: Nil,
+      returnNullable = false))
+}
+
+/**
+ * Server-side operations for [[TimestampLTZNanosType]].
+ *
+ * @param t
+ *   The TimestampLTZNanosType with precision information
+ * @since 4.3.0
+ */
+case class TimestampLTZNanosTypeOps(override val t: TimestampLTZNanosType)
+  extends TimestampLTZNanosTypeApiOps(t) with TimestampNanosTypeOps {
+
+  override def getPhysicalType: PhysicalDataType = 
PhysicalTimestampLTZNanosType
+
+  override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit =
+    (input, v) => input.setTimestampLTZNanos(ordinal, 
v.asInstanceOf[TimestampNanosVal])
+
+  override def toCatalystImpl(scalaValue: Any): Any = scalaValue match {
+    case i: Instant => DateTimeUtils.instantToTimestampNanos(i, t.precision)
+    case other => invalidExternalValue(other)
+  }
+
+  override def toScala(catalystValue: Any): Any =
+    if (catalystValue == null) null
+    else 
DateTimeUtils.timestampNanosToInstant(catalystValue.asInstanceOf[TimestampNanosVal])
+
+  override def toScalaImpl(row: InternalRow, column: Int): Any =
+    DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column))
+
+  override def createSerializer(input: Expression): Option[Expression] =
+    Some(StaticInvoke(
+      DateTimeUtils.getClass,
+      t,
+      "instantToTimestampNanos",
+      input :: Literal(t.precision) :: Nil,
+      returnNullable = false))
+
+  override def createDeserializer(path: Expression): Option[Expression] =
+    Some(StaticInvoke(
+      DateTimeUtils.getClass,
+      ObjectType(classOf[Instant]),
+      "timestampNanosToInstant",
+      path :: Nil,
+      returnNullable = false))
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala
index 7240f0533aa3..2313c22f1f51 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, 
Literal, MutableVa
 import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.execution.arrow.ArrowFieldWriter
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, TimeType}
+import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType, 
TimestampNTZNanosType, TimeType}
 
 /**
  * Server-side (catalyst) type operations for the Types Framework.
@@ -82,6 +82,18 @@ trait TypeOps extends Serializable {
    */
   def getJavaClass: Class[_]
 
+  /**
+   * Returns the boxed Java class used in code generation where a nullable, 
reference-typed value
+   * is required (e.g., array/map elements, external-value validation). 
Defaults to
+   * [[getJavaClass]], which is correct for object-backed types; 
primitive-backed types must
+   * override it to return the corresponding boxed class (e.g., 
classOf[java.lang.Long] for a
+   * Long-backed type).
+   *
+   * @return
+   *   boxed Java class
+   */
+  def getBoxedJavaClass: Class[_] = getJavaClass
+
   /**
    * Returns a MutableValue instance for use in SpecificInternalRow.
    *
@@ -231,6 +243,8 @@ object TypeOps {
     if (!SQLConf.get.typesFrameworkEnabled) return None
     dt match {
       case tt: TimeType => Some(TimeTypeOps(tt))
+      case t: TimestampNTZNanosType => Some(TimestampNTZNanosTypeOps(t))
+      case t: TimestampLTZNanosType => Some(TimestampLTZNanosTypeOps(t))
       // Add new types here - single registration point
       case _ => None
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a21653a011b3..30db7bbbd59d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -637,7 +637,8 @@ object SQLConf {
   val TYPES_FRAMEWORK_ENABLED =
     buildConf("spark.sql.types.framework.enabled")
       .internal()
-      .doc("When true, use the Types Framework for supported types (currently 
TimeType). " +
+      .doc("When true, use the Types Framework for supported types (currently 
TimeType and the " +
+        "nanosecond timestamp types TimestampNTZNanosType and 
TimestampLTZNanosType). " +
         "The framework centralizes type-specific operations in Ops classes 
instead of " +
         "scattered pattern matching. When false, use legacy scattered 
implementation.")
       .version("4.2.0")
@@ -655,11 +656,19 @@ object SQLConf {
         "Unparameterized TIMESTAMP, TIMESTAMP_NTZ, and TIMESTAMP_LTZ remain 
microsecond " +
         "types. Enabling this flag does not guarantee full SQL support: casts, 
Parquet read, " +
         "typed literals, and other operations may still fail until their 
respective features " +
-        "are implemented.")
+        "are implemented. The nanosecond timestamp types are implemented 
solely through the " +
+        "Types Framework, so this flag can only be set to true when " +
+        s"${TYPES_FRAMEWORK_ENABLED.key} is also true.")
       .version("4.3.0")
       .withBindingPolicy(ConfigBindingPolicy.SESSION)
       .booleanConf
-      .createWithDefault(Utils.isTesting)
+      .checkValue(
+        enabled => !enabled || SQLConf.get.typesFrameworkEnabled,
+        "REQUIREMENT",
+        _ => Map("confRequirement" ->
+          (s"'${TYPES_FRAMEWORK_ENABLED.key}' must be true to enable the 
nanosecond " +
+            "timestamp types.")))
+      .createWithDefaultFunction(() => Utils.isTesting)
 
   val EXTENDED_EXPLAIN_PROVIDERS = 
buildConf("spark.sql.extendedExplainProviders")
     .doc("A comma-separated list of classes that implement the" +
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
index d85350504f7f..02c967c0ffec 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.GenericArrayData
@@ -184,6 +184,21 @@ class TimestampNanosRowSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     checkEvaluation(Literal.create(null, TimestampLTZNanosType(7)), null)
   }
 
+  // Fractional-second formatting is not implemented yet, so CAST(nanos AS 
STRING) raises the
+  // user-facing UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING error. Both the 
interpreted
+  // (ToStringBase.castToString -> TypeApiOps.format) and codegen 
(ToStringBase.castToStringCode)
+  // paths must fail the same way (SPARK-57207).
+  test("CAST nanos timestamp to STRING raises an unsupported-feature error in 
both eval modes") {
+    Seq(
+      Literal.create(ntzValue, TimestampNTZNanosType(9)),
+      Literal.create(ltzValue, TimestampLTZNanosType(7))).foreach { lit =>
+      checkErrorInExpression[SparkUnsupportedOperationException](
+        Cast(lit, StringType),
+        condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING",
+        parameters = Map("dataType" -> ("\"" + lit.dataType.sql + "\"")))
+    }
+  }
+
   testBothCodegenAndInterpreted("UnsafeRow handles extreme epoch micros for 
nanos") {
     val fieldTypes: Array[DataType] = Array(TimestampNTZNanosType(9))
     val converter = UnsafeProjection.create(fieldTypes)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala
new file mode 100644
index 000000000000..6154dff97be5
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.types.ops
+
+import java.time.{Instant, LocalDateTime}
+
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, 
SparkUnsupportedOperationException}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, 
ExpressionEncoder}
+import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{InstantNanosEncoder, 
LocalDateTimeNanosEncoder, OptionEncoder}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal, 
MutableTimestampNanos, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.types.{PhysicalDataType, 
PhysicalTimestampLTZNanosType, PhysicalTimestampNTZNanosType}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType, 
TimestampNTZNanosType}
+import org.apache.spark.sql.types.ops.TypeApiOps
+import org.apache.spark.unsafe.types.TimestampNanosVal
+
+/**
+ * Tests for the Types Framework wiring of the nanosecond timestamp types 
(SPARK-57207).
+ *
+ * Verifies that TimestampNTZNanosType and TimestampLTZNanosType route 
physical representation,
+ * literals, row accessors, mutable values, codegen class selection, 
conversions, and encoders
+ * through TypeOps/TypeApiOps. The Types Framework is the sole integration 
path for these types, so
+ * the suite runs with spark.sql.types.framework.enabled = true (the default 
under tests).
+ */
+class TimestampNanosTypeOpsSuite extends SparkFunSuite with SQLHelper {
+
+  private val precisions = Seq(7, 8, 9)
+
+  private val ntzVal = TimestampNanosVal.fromParts(1234567890123L, 42.toShort)
+  private val ltzVal = TimestampNanosVal.fromParts(-98765L, 999.toShort)
+
+  // (dataType, expected physical type, sample value) tuples covering NTZ and 
LTZ for p in {7,8,9}.
+  private def ntzCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] =
+    precisions.map(p => (TimestampNTZNanosType(p), 
PhysicalTimestampNTZNanosType, ntzVal))
+
+  private def ltzCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] =
+    precisions.map(p => (TimestampLTZNanosType(p), 
PhysicalTimestampLTZNanosType, ltzVal))
+
+  private def allCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] = 
ntzCases ++ ltzCases
+
+  private def checkPhysicalAndLiteralAndCodegen(
+      dt: DataType,
+      physical: PhysicalDataType): Unit = {
+    assert(PhysicalDataType(dt) === physical, s"physical type for $dt")
+    val default = Literal.default(dt)
+    assert(default.dataType === dt, s"default literal type for $dt")
+    assert(default.value === TimestampNanosVal.ZERO, s"default literal value 
for $dt")
+    assert(CodeGenerator.javaClass(dt) === classOf[TimestampNanosVal], 
s"javaClass for $dt")
+  }
+
+  private def checkRowRoundtrip(dt: DataType, value: TimestampNanosVal): Unit 
= {
+    val accessor = InternalRow.getAccessor(dt)
+    val writer = InternalRow.getWriter(0, dt)
+
+    val genericRow = new GenericInternalRow(Array[Any](null, null))
+    writer(genericRow, value)
+    assert(accessor(genericRow, 0) === value, s"GenericInternalRow roundtrip 
for $dt")
+    assert(accessor(new GenericInternalRow(Array[Any](null, null)), 0) === 
null)
+
+    val specificRow = new SpecificInternalRow(Seq(dt))
+    specificRow.update(0, value)
+    assert(accessor(specificRow, 0) === value, s"SpecificInternalRow roundtrip 
for $dt")
+    specificRow.update(0, null)
+    assert(accessor(specificRow, 0) === null)
+  }
+
+  test("TypeOps and TypeApiOps are registered when the framework is enabled") {
+    allCases.foreach { case (dt, _, _) =>
+      assert(TypeOps(dt).isDefined, s"TypeOps should be defined for $dt")
+      assert(TypeApiOps(dt).isDefined, s"TypeApiOps should be defined for $dt")
+    }
+  }
+
+  test("physical type, default literal, and codegen class (framework 
enabled)") {
+    allCases.foreach { case (dt, physical, _) =>
+      checkPhysicalAndLiteralAndCodegen(dt, physical)
+    }
+  }
+
+  test("InternalRow and SpecificInternalRow roundtrip (framework enabled)") {
+    allCases.foreach { case (dt, _, value) =>
+      checkRowRoundtrip(dt, value)
+    }
+  }
+
+  test("SpecificInternalRow uses a dedicated MutableTimestampNanos holder") {
+    allCases.foreach { case (dt, _, _) =>
+      val row = new SpecificInternalRow(Seq(dt))
+      assert(row.values(0).isInstanceOf[MutableTimestampNanos],
+        s"expected MutableTimestampNanos for $dt")
+    }
+  }
+
+  test("getEncoder returns the SPARK-57033 nanos encoder (matches the legacy 
RowEncoder path)") {
+    precisions.foreach { p =>
+      assert(TypeApiOps(TimestampNTZNanosType(p)).get.getEncoder === 
LocalDateTimeNanosEncoder(p))
+      assert(TypeApiOps(TimestampLTZNanosType(p)).get.getEncoder === 
InstantNanosEncoder(p))
+    }
+  }
+
+  test("getEncoder honors the timestampNanosTypes.enabled gate") {
+    withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "false") {
+      allCases.foreach { case (dt, _, _) =>
+        val e = 
intercept[org.apache.spark.SparkException](TypeApiOps(dt).get.getEncoder)
+        assert(e.getCondition === "FEATURE_NOT_ENABLED")
+      }
+    }
+  }
+
+  // A sample with sub-micro digits so precision truncation is exercised.
+  private val sampleLocalDateTime = 
LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+  private val sampleInstant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+
+  private def externalValue(dt: DataType): Any = dt match {
+    case _: TimestampNTZNanosType => sampleLocalDateTime
+    case _: TimestampLTZNanosType => sampleInstant
+  }
+
+  test("CatalystTypeConverters convert java.time values (matches the legacy 
converter path)") {
+    allCases.foreach { case (dt, _, _) =>
+      val external = externalValue(dt)
+      val expectedCatalyst = dt match {
+        case t: TimestampNTZNanosType =>
+          DateTimeUtils.localDateTimeToTimestampNanos(sampleLocalDateTime, 
t.precision)
+        case t: TimestampLTZNanosType =>
+          DateTimeUtils.instantToTimestampNanos(sampleInstant, t.precision)
+      }
+
+      // toScala over the truncated catalyst value, i.e. what a lossless 
roundtrip yields.
+      val expectedScala = dt match {
+        case _: TimestampNTZNanosType =>
+          DateTimeUtils.timestampNanosToLocalDateTime(expectedCatalyst)
+        case _: TimestampLTZNanosType =>
+          DateTimeUtils.timestampNanosToInstant(expectedCatalyst)
+      }
+
+      val catalyst = 
CatalystTypeConverters.createToCatalystConverter(dt)(external)
+      assert(catalyst === expectedCatalyst, s"toCatalyst for $dt")
+      assert(catalyst.isInstanceOf[TimestampNanosVal], s"toCatalyst must not 
be identity for $dt")
+
+      val scala = CatalystTypeConverters.createToScalaConverter(dt)(catalyst)
+      assert(scala === expectedScala, s"toScala roundtrip for $dt")
+    }
+  }
+
+  test("format and toSQLValue raise 
UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING") {
+    allCases.foreach { case (dt, _, value) =>
+      val ops = TypeApiOps(dt).get
+      Seq[() => Any](() => ops.format(value), () => 
ops.toSQLValue(value)).foreach { call =>
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException](call()),
+          condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_TO_STRING",
+          parameters = Map("dataType" -> ("\"" + dt.sql + "\"")))
+      }
+    }
+  }
+
+  private def checkOptionRoundtrip[T](
+      enc: AgnosticEncoder[Option[T]],
+      values: Seq[Option[T]]): Unit = {
+    val encoder = ExpressionEncoder(enc).resolveAndBind()
+    val toRow = encoder.createSerializer()
+    val fromRow = encoder.createDeserializer()
+    values.foreach(v => assert(fromRow(toRow(v)) === v, s"roundtrip for $enc 
with $v"))
+  }
+
+  test("Option-wrapped nanos encoders round-trip (wrapper unwrapped before 
framework dispatch)") {
+    // Regression for the framework serde dispatch keying off enc.dataType: 
OptionEncoder proxies
+    // dataType to the wrapped nanos leaf, so the wrapper must be handled 
(UnwrapOption/WrapOption)
+    // before the TypeOps leaf dispatch. Precision 9 keeps the roundtrip 
lossless (SPARK-57207).
+    checkOptionRoundtrip(
+      OptionEncoder(LocalDateTimeNanosEncoder(9)),
+      Seq(Some(LocalDateTime.parse("2019-02-26T16:56:00.123456789")), None))
+    checkOptionRoundtrip(
+      OptionEncoder(InstantNanosEncoder(9)),
+      Seq(Some(Instant.parse("2019-02-26T16:56:00.123456789Z")), None))
+  }
+
+  test("framework disabled leaves the nanos types unsupported (no legacy 
fallback)") {
+    withSQLConf(SQLConf.TYPES_FRAMEWORK_ENABLED.key -> "false") {
+      allCases.foreach { case (dt, _, _) =>
+        assert(TypeOps(dt).isEmpty, s"TypeOps should be empty for $dt when 
disabled")
+        assert(TypeApiOps(dt).isEmpty, s"TypeApiOps should be empty for $dt 
when disabled")
+      }
+    }
+  }
+
+  test("enabling the nanos types requires the Types Framework to be enabled") {
+    withSQLConf(SQLConf.TYPES_FRAMEWORK_ENABLED.key -> "false") {
+      checkError(
+        exception = intercept[SparkIllegalArgumentException] {
+          SQLConf.get.setConfString(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key, 
"true")
+        },
+        condition = "INVALID_CONF_VALUE.REQUIREMENT",
+        parameters = Map(
+          "confName" -> SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key,
+          "confValue" -> "true",
+          "confRequirement" ->
+            (s"'${SQLConf.TYPES_FRAMEWORK_ENABLED.key}' must be true to enable 
the nanosecond " +
+              "timestamp types.")))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to