[ 
https://issues.apache.org/jira/browse/SPARK-57033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk updated SPARK-57033:
-----------------------------
    Description: 
h2. Summary

Add conversion between {{java.time.LocalDateTime}} / {{java.time.Instant}} and 
the SPIP composite internal representation {{(epochMicros, nanosWithinMicro)}}, 
and wire Spark's encoder/converter stack so *Dataset* create/collect roundtrips 
preserve *nanosecond* precision end-to-end.

h2. Motivation

  Spark's encoders today truncate sub-microsecond fractional digits when a 
java.time value flows through a Dataset. With the new TimestampNTZNanosType / 
TimestampLTZNanosType
  (SPARK-56981), the internal row can carry nanos, but no converter exists 
between Java's external types and that representation. As a result:

{code:scala}
  val ts = LocalDateTime.parse("2025-01-01T00:00:00.123456789")
  val ds = spark.createDataset(Seq(ts))(/* nanos NTZ encoder */)
  ds.collect().head  // -> 2025-01-01T00:00:00.123456000  (last 3 digits lost)
{code}


  This ticket adds the conversion layer so that create → internal row → collect 
is lossless for nanosecond inputs.

h2. Background

Microsecond timestamps already roundtrip through:

* {{CatalystTypeConverters}} -- {{Instant}} ↔ micro {{Long}} 
({{TimestampType}}), {{LocalDateTime}} ↔ micro {{Long}} ({{TimestampNTZType}})
* {{ExpressionEncoder}} / {{RowEncoderSuite}} -- encode-decode tests for 
{{Instant}} and {{LocalDateTime}}
* {{Encoders.INSTANT()}}, {{Encoders.LOCALDATETIME()}}, 
{{JavaDatasetSuite.testLocalDateTimeEncoder}} -- Dataset create/collect

Nanosecond-capable types store values as {{TimestampNTZNanos}} / 
{{TimestampLTZNanos}} (composite layout from 
[SPARK-56981|https://github.com/apache/spark/pull/56059]). There is *no* 
{{CatalystTypeConverters}} path yet for {{TimestampNTZNanosType(p)}} / 
{{TimestampLTZNanosType(p)}}, and no end-to-end Dataset test proving sub-micro 
precision survives create → internal row → collect.

h2. Scope

h3. 1. Composite conversion helpers

Add package-private helpers (extend {{SparkDateTimeUtils}} / {{DateTimeUtils}} 
or a small dedicated module), e.g.:

* {{localDateTimeToTimestampNTZNanos(ldt: LocalDateTime): TimestampNTZNanos}}
* {{timestampNTZNanosToLocalDateTime(v: TimestampNTZNanos): LocalDateTime}}
* {{instantToTimestampLTZNanos(instant: Instant): TimestampLTZNanos}} (absolute 
instant → SPIP pair; LTZ semantics via zone rules where applicable)
* {{timestampLTZNanosToInstant(v: TimestampLTZNanos): Instant}}

*Normalization invariant:* {{nanosWithinMicro}} always in \[0, 999\]; use 
{{Math.addExact}} for carry from sub-micro remainder into {{epochMicros}}.

*Precision:* helpers produce full nanosecond resolution; truncation to declared 
type precision *p* ∈ \[7, 9\] is applied at cast/schema boundaries (future cast 
ticket), not silently in the base conversion unless documented otherwise.

h3. 2. CatalystTypeConverters

Register converters for nanos logical types (mirror {{TimestampNTZConverter}} / 
{{InstantConverter}}):

* {{TimestampNTZNanosType(_)}} + {{LocalDateTime}} (NTZ wall-clock)
* {{TimestampLTZNanosType(_)}} + {{Instant}} when 
{{spark.sql.datetime.java8API.enabled=true}} (LTZ instant timeline)

Wire into {{createToCatalystConverter}}, {{createToScalaConverter}}, and 
{{convertToCatalyst}} special cases for {{LocalDateTime}} / {{Instant}} when 
the target schema column is a nanos timestamp type.

h3. 3. Encoder / deserializer plumbing

Ensure {{ExpressionEncoder}} over a {{StructType}} with 
{{TimestampNTZNanosType(p)}} / {{TimestampLTZNanosType(p)}} columns can 
serialize and deserialize {{Row}} values holding {{LocalDateTime}} / 
{{Instant}}:

* {{SerializerBuildHelper}} / {{DeserializerBuildHelper}} as needed (follow 
{{LocalDateTimeEncoder}} / {{InstantEncoder}} patterns for micro types)
* {{Row}} / {{GenericInternalRow}} paths use {{getTimestampNTZNanos}} / 
{{setTimestampNTZNanos}} (physical accessors from SPARK-56981)

h3. 4. End-to-end Dataset tests (required)

Add integration tests that *create Datasets* from {{java.time}} values with 
*nanosecond* fractional parts and {{collectAsList()}} / {{collect()}} back with 
*exact* nanosecond equality:

* *Scala* ({{sql/core}}): {{Dataset[Row]}} or case-class rows with schema 
{{TimestampNTZNanosType(9)}} / {{TimestampLTZNanosType(9)}}; values like 
{{LocalDateTime.of(2019, 2, 26, 16, 56, 0, 123456789)}} and 
{{Instant.parse("2019-02-26T16:56:00.123456789Z")}}
* *Java* ({{JavaDatasetSuite}} or new suite): same roundtrip via 
{{spark.createDataset(..., encoder)}}
* Assert sub-micro digits preserved (not truncated to micros like today's 
{{TimestampNTZType}} path)
* Include null column, multiple rows, and at least one edge instant from the 
datetime corpus (epoch, pre-1900, max range)

*Unit tests* (catalyst, faster feedback):

* {{CatalystTypeConvertersSuite}} -- {{LocalDateTime}} / {{Instant}} ↔ 
{{TimestampNTZNanos}} / {{TimestampLTZNanos}} roundtrip
* {{RowEncoderSuite}} -- encode/decode rows with nanos timestamp columns 
(mirror existing SPARK-35664 micro tests)

h2. Implementation notes

* Reuse {{LocalDateTime.getNano()}} / {{Instant.getNano()}} decomposition: 
whole seconds → {{epochMicros}} grid, nano-of-second → {{nanosWithinMicro}} + 
micro carry.
* Do *not* route through micro {{Long}} internally (that would lose sub-micro 
digits); convert directly between {{java.time}} and {{TimestampNTZNanos}} / 
{{TimestampLTZNanos}}.
* Existing {{TimestampNTZType}} / {{TimestampType}} converter behavior must 
remain unchanged.
* Follow {{TimeType}} precedent: nanosecond precision preserved in catalyst for 
{{LocalTime}}; same expectation for nanos timestamps.

h2. Acceptance criteria

* Roundtrip property tests in DateTimeUtilsSuite covering Instant.MIN, 
Instant.MAX, epoch, negative epoch, and random instants for each n ∈ [7, 9].
* CatalystTypeConvertersSuite confirms the new converters are picked up for the 
nanos logical types.
* ExpressionEncoderSuite tests for Dataset[LocalDateTime] and Dataset[Instant] 
showing create → collect preserves nanos.
* Existing micro-precision encoder/converter tests are unchanged (no 
regressions).

  was:
h2. Summary

Add conversion between {{java.time.LocalDateTime}} / {{java.time.Instant}} and 
the SPIP composite internal representation {{(epochMicros, nanosWithinMicro)}}, 
and wire Spark's encoder/converter stack so *Dataset* create/collect roundtrips 
preserve *nanosecond* precision end-to-end.

h2. Background

Microsecond timestamps already roundtrip through:

* {{CatalystTypeConverters}} -- {{Instant}} ↔ micro {{Long}} 
({{TimestampType}}), {{LocalDateTime}} ↔ micro {{Long}} ({{TimestampNTZType}})
* {{ExpressionEncoder}} / {{RowEncoderSuite}} -- encode-decode tests for 
{{Instant}} and {{LocalDateTime}}
* {{Encoders.INSTANT()}}, {{Encoders.LOCALDATETIME()}}, 
{{JavaDatasetSuite.testLocalDateTimeEncoder}} -- Dataset create/collect

Nanosecond-capable types store values as {{TimestampNTZNanos}} / 
{{TimestampLTZNanos}} (composite layout from 
[SPARK-56981|https://github.com/apache/spark/pull/56059]). There is *no* 
{{CatalystTypeConverters}} path yet for {{TimestampNTZNanosType(p)}} / 
{{TimestampLTZNanosType(p)}}, and no end-to-end Dataset test proving sub-micro 
precision survives create → internal row → collect.

Sub-task *1a* (string parsing) is a separate ticket; this ticket uses 
{{java.time}} objects as the primary input/output surface for conversion and 
Dataset tests.

h2. Scope

h3. 1. Composite conversion helpers

Add package-private helpers (extend {{SparkDateTimeUtils}} / {{DateTimeUtils}} 
or a small dedicated module), e.g.:

* {{localDateTimeToTimestampNTZNanos(ldt: LocalDateTime): TimestampNTZNanos}}
* {{timestampNTZNanosToLocalDateTime(v: TimestampNTZNanos): LocalDateTime}}
* {{instantToTimestampLTZNanos(instant: Instant): TimestampLTZNanos}} (absolute 
instant → SPIP pair; LTZ semantics via zone rules where applicable)
* {{timestampLTZNanosToInstant(v: TimestampLTZNanos): Instant}}

*Normalization invariant:* {{nanosWithinMicro}} always in \[0, 999\]; use 
{{Math.addExact}} for carry from sub-micro remainder into {{epochMicros}}.

*Precision:* helpers produce full nanosecond resolution; truncation to declared 
type precision *p* ∈ \[7, 9\] is applied at cast/schema boundaries (future cast 
ticket), not silently in the base conversion unless documented otherwise.

h3. 2. CatalystTypeConverters

Register converters for nanos logical types (mirror {{TimestampNTZConverter}} / 
{{InstantConverter}}):

* {{TimestampNTZNanosType(_)}} + {{LocalDateTime}} (NTZ wall-clock)
* {{TimestampLTZNanosType(_)}} + {{Instant}} when 
{{spark.sql.datetime.java8API.enabled=true}} (LTZ instant timeline)

Wire into {{createToCatalystConverter}}, {{createToScalaConverter}}, and 
{{convertToCatalyst}} special cases for {{LocalDateTime}} / {{Instant}} when 
the target schema column is a nanos timestamp type.

h3. 3. Encoder / deserializer plumbing

Ensure {{ExpressionEncoder}} over a {{StructType}} with 
{{TimestampNTZNanosType(p)}} / {{TimestampLTZNanosType(p)}} columns can 
serialize and deserialize {{Row}} values holding {{LocalDateTime}} / 
{{Instant}}:

* {{SerializerBuildHelper}} / {{DeserializerBuildHelper}} as needed (follow 
{{LocalDateTimeEncoder}} / {{InstantEncoder}} patterns for micro types)
* {{Row}} / {{GenericInternalRow}} paths use {{getTimestampNTZNanos}} / 
{{setTimestampNTZNanos}} (physical accessors from SPARK-56981)

h3. 4. End-to-end Dataset tests (required)

Add integration tests that *create Datasets* from {{java.time}} values with 
*nanosecond* fractional parts and {{collectAsList()}} / {{collect()}} back with 
*exact* nanosecond equality:

* *Scala* ({{sql/core}}): {{Dataset[Row]}} or case-class rows with schema 
{{TimestampNTZNanosType(9)}} / {{TimestampLTZNanosType(9)}}; values like 
{{LocalDateTime.of(2019, 2, 26, 16, 56, 0, 123456789)}} and 
{{Instant.parse("2019-02-26T16:56:00.123456789Z")}}
* *Java* ({{JavaDatasetSuite}} or new suite): same roundtrip via 
{{spark.createDataset(..., encoder)}}
* Assert sub-micro digits preserved (not truncated to micros like today's 
{{TimestampNTZType}} path)
* Include null column, multiple rows, and at least one edge instant from the 
datetime corpus (epoch, pre-1900, max range)

*Unit tests* (catalyst, faster feedback):

* {{CatalystTypeConvertersSuite}} -- {{LocalDateTime}} / {{Instant}} ↔ 
{{TimestampNTZNanos}} / {{TimestampLTZNanos}} roundtrip
* {{RowEncoderSuite}} -- encode/decode rows with nanos timestamp columns 
(mirror existing SPARK-35664 micro tests)

h2. Implementation notes

* Reuse {{LocalDateTime.getNano()}} / {{Instant.getNano()}} decomposition: 
whole seconds → {{epochMicros}} grid, nano-of-second → {{nanosWithinMicro}} + 
micro carry.
* Do *not* route through micro {{Long}} internally (that would lose sub-micro 
digits); convert directly between {{java.time}} and {{TimestampNTZNanos}} / 
{{TimestampLTZNanos}}.
* Existing {{TimestampNTZType}} / {{TimestampType}} converter behavior must 
remain unchanged.
* Follow {{TimeType}} precedent: nanosecond precision preserved in catalyst for 
{{LocalTime}}; same expectation for nanos timestamps.

h2. Acceptance criteria

* Conversion helpers roundtrip {{java.time}} ↔ composite values without 
sub-micro loss for valid instants in supported range.
* {{CatalystTypeConvertersSuite}} and {{RowEncoderSuite}} pass with new 
nanosecond cases.
* End-to-end Dataset tests (Scala + Java) create from {{LocalDateTime}} / 
{{Instant}} with nanosecond fractions and collect back with {{equals}} match on 
full nanosecond field.
* No regression in existing micro timestamp encoder/converter tests.


> Add java.time LocalDateTime/Instant conversion and Dataset roundtrip for 
> nanosecond timestamps
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-57033
>                 URL: https://issues.apache.org/jira/browse/SPARK-57033
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 4.3.0
>            Reporter: Max Gekk
>            Priority: Major
>
> h2. Summary
> Add conversion between {{java.time.LocalDateTime}} / {{java.time.Instant}} 
> and the SPIP composite internal representation {{(epochMicros, 
> nanosWithinMicro)}}, and wire Spark's encoder/converter stack so *Dataset* 
> create/collect roundtrips preserve *nanosecond* precision end-to-end.
> h2. Motivation
>   Spark's encoders today truncate sub-microsecond fractional digits when a 
> java.time value flows through a Dataset. With the new TimestampNTZNanosType / 
> TimestampLTZNanosType
>   (SPARK-56981), the internal row can carry nanos, but no converter exists 
> between Java's external types and that representation. As a result:
> {code:scala}
>   val ts = LocalDateTime.parse("2025-01-01T00:00:00.123456789")
>   val ds = spark.createDataset(Seq(ts))(/* nanos NTZ encoder */)
>   ds.collect().head  // -> 2025-01-01T00:00:00.123456000  (last 3 digits lost)
> {code}
>   This ticket adds the conversion layer so that create → internal row → 
> collect is lossless for nanosecond inputs.
> h2. Background
> Microsecond timestamps already roundtrip through:
> * {{CatalystTypeConverters}} -- {{Instant}} ↔ micro {{Long}} 
> ({{TimestampType}}), {{LocalDateTime}} ↔ micro {{Long}} ({{TimestampNTZType}})
> * {{ExpressionEncoder}} / {{RowEncoderSuite}} -- encode-decode tests for 
> {{Instant}} and {{LocalDateTime}}
> * {{Encoders.INSTANT()}}, {{Encoders.LOCALDATETIME()}}, 
> {{JavaDatasetSuite.testLocalDateTimeEncoder}} -- Dataset create/collect
> Nanosecond-capable types store values as {{TimestampNTZNanos}} / 
> {{TimestampLTZNanos}} (composite layout from 
> [SPARK-56981|https://github.com/apache/spark/pull/56059]). There is *no* 
> {{CatalystTypeConverters}} path yet for {{TimestampNTZNanosType(p)}} / 
> {{TimestampLTZNanosType(p)}}, and no end-to-end Dataset test proving 
> sub-micro precision survives create → internal row → collect.
> h2. Scope
> h3. 1. Composite conversion helpers
> Add package-private helpers (extend {{SparkDateTimeUtils}} / 
> {{DateTimeUtils}} or a small dedicated module), e.g.:
> * {{localDateTimeToTimestampNTZNanos(ldt: LocalDateTime): TimestampNTZNanos}}
> * {{timestampNTZNanosToLocalDateTime(v: TimestampNTZNanos): LocalDateTime}}
> * {{instantToTimestampLTZNanos(instant: Instant): TimestampLTZNanos}} 
> (absolute instant → SPIP pair; LTZ semantics via zone rules where applicable)
> * {{timestampLTZNanosToInstant(v: TimestampLTZNanos): Instant}}
> *Normalization invariant:* {{nanosWithinMicro}} always in \[0, 999\]; use 
> {{Math.addExact}} for carry from sub-micro remainder into {{epochMicros}}.
> *Precision:* helpers produce full nanosecond resolution; truncation to 
> declared type precision *p* ∈ \[7, 9\] is applied at cast/schema boundaries 
> (future cast ticket), not silently in the base conversion unless documented 
> otherwise.
> h3. 2. CatalystTypeConverters
> Register converters for nanos logical types (mirror {{TimestampNTZConverter}} 
> / {{InstantConverter}}):
> * {{TimestampNTZNanosType(_)}} + {{LocalDateTime}} (NTZ wall-clock)
> * {{TimestampLTZNanosType(_)}} + {{Instant}} when 
> {{spark.sql.datetime.java8API.enabled=true}} (LTZ instant timeline)
> Wire into {{createToCatalystConverter}}, {{createToScalaConverter}}, and 
> {{convertToCatalyst}} special cases for {{LocalDateTime}} / {{Instant}} when 
> the target schema column is a nanos timestamp type.
> h3. 3. Encoder / deserializer plumbing
> Ensure {{ExpressionEncoder}} over a {{StructType}} with 
> {{TimestampNTZNanosType(p)}} / {{TimestampLTZNanosType(p)}} columns can 
> serialize and deserialize {{Row}} values holding {{LocalDateTime}} / 
> {{Instant}}:
> * {{SerializerBuildHelper}} / {{DeserializerBuildHelper}} as needed (follow 
> {{LocalDateTimeEncoder}} / {{InstantEncoder}} patterns for micro types)
> * {{Row}} / {{GenericInternalRow}} paths use {{getTimestampNTZNanos}} / 
> {{setTimestampNTZNanos}} (physical accessors from SPARK-56981)
> h3. 4. End-to-end Dataset tests (required)
> Add integration tests that *create Datasets* from {{java.time}} values with 
> *nanosecond* fractional parts and {{collectAsList()}} / {{collect()}} back 
> with *exact* nanosecond equality:
> * *Scala* ({{sql/core}}): {{Dataset[Row]}} or case-class rows with schema 
> {{TimestampNTZNanosType(9)}} / {{TimestampLTZNanosType(9)}}; values like 
> {{LocalDateTime.of(2019, 2, 26, 16, 56, 0, 123456789)}} and 
> {{Instant.parse("2019-02-26T16:56:00.123456789Z")}}
> * *Java* ({{JavaDatasetSuite}} or new suite): same roundtrip via 
> {{spark.createDataset(..., encoder)}}
> * Assert sub-micro digits preserved (not truncated to micros like today's 
> {{TimestampNTZType}} path)
> * Include null column, multiple rows, and at least one edge instant from the 
> datetime corpus (epoch, pre-1900, max range)
> *Unit tests* (catalyst, faster feedback):
> * {{CatalystTypeConvertersSuite}} -- {{LocalDateTime}} / {{Instant}} ↔ 
> {{TimestampNTZNanos}} / {{TimestampLTZNanos}} roundtrip
> * {{RowEncoderSuite}} -- encode/decode rows with nanos timestamp columns 
> (mirror existing SPARK-35664 micro tests)
> h2. Implementation notes
> * Reuse {{LocalDateTime.getNano()}} / {{Instant.getNano()}} decomposition: 
> whole seconds → {{epochMicros}} grid, nano-of-second → {{nanosWithinMicro}} + 
> micro carry.
> * Do *not* route through micro {{Long}} internally (that would lose sub-micro 
> digits); convert directly between {{java.time}} and {{TimestampNTZNanos}} / 
> {{TimestampLTZNanos}}.
> * Existing {{TimestampNTZType}} / {{TimestampType}} converter behavior must 
> remain unchanged.
> * Follow {{TimeType}} precedent: nanosecond precision preserved in catalyst 
> for {{LocalTime}}; same expectation for nanos timestamps.
> h2. Acceptance criteria
> * Roundtrip property tests in DateTimeUtilsSuite covering Instant.MIN, 
> Instant.MAX, epoch, negative epoch, and random instants for each n ∈ [7, 9].
> * CatalystTypeConvertersSuite confirms the new converters are picked up for 
> the nanos logical types.
> * ExpressionEncoderSuite tests for Dataset[LocalDateTime] and 
> Dataset[Instant] showing create → collect preserves nanos.
> * Existing micro-precision encoder/converter tests are unchanged (no 
> regressions).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to