[
https://issues.apache.org/jira/browse/SPARK-57033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Max Gekk updated SPARK-57033:
-----------------------------
Description:
h2. Summary
Add conversion between {{java.time.LocalDateTime}} / {{java.time.Instant}} and
the SPIP composite internal representation {{(epochMicros, nanosWithinMicro)}},
and wire Spark's encoder/converter stack so *Dataset* create/collect roundtrips
preserve *nanosecond* precision end-to-end.
h2. Motivation
Spark's encoders today truncate sub-microsecond fractional digits when a
java.time value flows through a Dataset. With the new TimestampNTZNanosType /
TimestampLTZNanosType
(SPARK-56981), the internal row can carry nanos, but no converter exists
between Java's external types and that representation. As a result:
{code:scala}
val ts = LocalDateTime.parse("2025-01-01T00:00:00.123456789")
val ds = spark.createDataset(Seq(ts))(/* nanos NTZ encoder */)
ds.collect().head // -> 2025-01-01T00:00:00.123456000 (last 3 digits lost)
{code}
This ticket adds the conversion layer so that create → internal row → collect
is lossless for nanosecond inputs.
h2. Background
Microsecond timestamps already roundtrip through:
* {{CatalystTypeConverters}} -- {{Instant}} ↔ micro {{Long}}
({{TimestampType}}), {{LocalDateTime}} ↔ micro {{Long}} ({{TimestampNTZType}})
* {{ExpressionEncoder}} / {{RowEncoderSuite}} -- encode-decode tests for
{{Instant}} and {{LocalDateTime}}
* {{Encoders.INSTANT()}}, {{Encoders.LOCALDATETIME()}},
{{JavaDatasetSuite.testLocalDateTimeEncoder}} -- Dataset create/collect
Nanosecond-capable types store values as {{TimestampNTZNanos}} /
{{TimestampLTZNanos}} (composite layout from
[SPARK-56981|https://github.com/apache/spark/pull/56059]). There is *no*
{{CatalystTypeConverters}} path yet for {{TimestampNTZNanosType(p)}} /
{{TimestampLTZNanosType(p)}}, and no end-to-end Dataset test proving sub-micro
precision survives create → internal row → collect.
h2. Scope
h3. 1. Composite conversion helpers
Add package-private helpers (extend {{SparkDateTimeUtils}} / {{DateTimeUtils}}
or a small dedicated module), e.g.:
* {{localDateTimeToTimestampNTZNanos(ldt: LocalDateTime): TimestampNTZNanos}}
* {{timestampNTZNanosToLocalDateTime(v: TimestampNTZNanos): LocalDateTime}}
* {{instantToTimestampLTZNanos(instant: Instant): TimestampLTZNanos}} (absolute
instant → SPIP pair; LTZ semantics via zone rules where applicable)
* {{timestampLTZNanosToInstant(v: TimestampLTZNanos): Instant}}
*Normalization invariant:* {{nanosWithinMicro}} always in \[0, 999\]; use
{{Math.addExact}} for carry from sub-micro remainder into {{epochMicros}}.
*Precision:* helpers produce full nanosecond resolution; truncation to declared
type precision *p* ∈ \[7, 9\] is applied at cast/schema boundaries (future cast
ticket), not silently in the base conversion unless documented otherwise.
h3. 2. CatalystTypeConverters
Register converters for nanos logical types (mirror {{TimestampNTZConverter}} /
{{InstantConverter}}):
* {{TimestampNTZNanosType(_)}} + {{LocalDateTime}} (NTZ wall-clock)
* {{TimestampLTZNanosType(_)}} + {{Instant}} when
{{spark.sql.datetime.java8API.enabled=true}} (LTZ instant timeline)
Wire into {{createToCatalystConverter}}, {{createToScalaConverter}}, and
{{convertToCatalyst}} special cases for {{LocalDateTime}} / {{Instant}} when
the target schema column is a nanos timestamp type.
h3. 3. Encoder / deserializer plumbing
Ensure {{ExpressionEncoder}} over a {{StructType}} with
{{TimestampNTZNanosType(p)}} / {{TimestampLTZNanosType(p)}} columns can
serialize and deserialize {{Row}} values holding {{LocalDateTime}} /
{{Instant}}:
* {{SerializerBuildHelper}} / {{DeserializerBuildHelper}} as needed (follow
{{LocalDateTimeEncoder}} / {{InstantEncoder}} patterns for micro types)
* {{Row}} / {{GenericInternalRow}} paths use {{getTimestampNTZNanos}} /
{{setTimestampNTZNanos}} (physical accessors from SPARK-56981)
h3. 4. End-to-end Dataset tests (required)
Add integration tests that *create Datasets* from {{java.time}} values with
*nanosecond* fractional parts and {{collectAsList()}} / {{collect()}} back with
*exact* nanosecond equality:
* *Scala* ({{sql/core}}): {{Dataset[Row]}} or case-class rows with schema
{{TimestampNTZNanosType(9)}} / {{TimestampLTZNanosType(9)}}; values like
{{LocalDateTime.of(2019, 2, 26, 16, 56, 0, 123456789)}} and
{{Instant.parse("2019-02-26T16:56:00.123456789Z")}}
* *Java* ({{JavaDatasetSuite}} or new suite): same roundtrip via
{{spark.createDataset(..., encoder)}}
* Assert sub-micro digits preserved (not truncated to micros like today's
{{TimestampNTZType}} path)
* Include null column, multiple rows, and at least one edge instant from the
datetime corpus (epoch, pre-1900, max range)
*Unit tests* (catalyst, faster feedback):
* {{CatalystTypeConvertersSuite}} -- {{LocalDateTime}} / {{Instant}} ↔
{{TimestampNTZNanos}} / {{TimestampLTZNanos}} roundtrip
* {{RowEncoderSuite}} -- encode/decode rows with nanos timestamp columns
(mirror existing SPARK-35664 micro tests)
h2. Implementation notes
* Reuse {{LocalDateTime.getNano()}} / {{Instant.getNano()}} decomposition:
whole seconds → {{epochMicros}} grid, nano-of-second → {{nanosWithinMicro}} +
micro carry.
* Do *not* route through micro {{Long}} internally (that would lose sub-micro
digits); convert directly between {{java.time}} and {{TimestampNTZNanos}} /
{{TimestampLTZNanos}}.
* Existing {{TimestampNTZType}} / {{TimestampType}} converter behavior must
remain unchanged.
* Follow {{TimeType}} precedent: nanosecond precision preserved in catalyst for
{{LocalTime}}; same expectation for nanos timestamps.
h2. Acceptance criteria
* Roundtrip property tests in DateTimeUtilsSuite covering Instant.MIN,
Instant.MAX, epoch, negative epoch, and random instants for each n ∈ [7, 9].
* CatalystTypeConvertersSuite confirms the new converters are picked up for the
nanos logical types.
* ExpressionEncoderSuite tests for Dataset[LocalDateTime] and Dataset[Instant]
showing create → collect preserves nanos.
* Existing micro-precision encoder/converter tests are unchanged (no
regressions).
was:
h2. Summary
Add conversion between {{java.time.LocalDateTime}} / {{java.time.Instant}} and
the SPIP composite internal representation {{(epochMicros, nanosWithinMicro)}},
and wire Spark's encoder/converter stack so *Dataset* create/collect roundtrips
preserve *nanosecond* precision end-to-end.
h2. Background
Microsecond timestamps already roundtrip through:
* {{CatalystTypeConverters}} -- {{Instant}} ↔ micro {{Long}}
({{TimestampType}}), {{LocalDateTime}} ↔ micro {{Long}} ({{TimestampNTZType}})
* {{ExpressionEncoder}} / {{RowEncoderSuite}} -- encode-decode tests for
{{Instant}} and {{LocalDateTime}}
* {{Encoders.INSTANT()}}, {{Encoders.LOCALDATETIME()}},
{{JavaDatasetSuite.testLocalDateTimeEncoder}} -- Dataset create/collect
Nanosecond-capable types store values as {{TimestampNTZNanos}} /
{{TimestampLTZNanos}} (composite layout from
[SPARK-56981|https://github.com/apache/spark/pull/56059]). There is *no*
{{CatalystTypeConverters}} path yet for {{TimestampNTZNanosType(p)}} /
{{TimestampLTZNanosType(p)}}, and no end-to-end Dataset test proving sub-micro
precision survives create → internal row → collect.
Sub-task *1a* (string parsing) is a separate ticket; this ticket uses
{{java.time}} objects as the primary input/output surface for conversion and
Dataset tests.
h2. Scope
h3. 1. Composite conversion helpers
Add package-private helpers (extend {{SparkDateTimeUtils}} / {{DateTimeUtils}}
or a small dedicated module), e.g.:
* {{localDateTimeToTimestampNTZNanos(ldt: LocalDateTime): TimestampNTZNanos}}
* {{timestampNTZNanosToLocalDateTime(v: TimestampNTZNanos): LocalDateTime}}
* {{instantToTimestampLTZNanos(instant: Instant): TimestampLTZNanos}} (absolute
instant → SPIP pair; LTZ semantics via zone rules where applicable)
* {{timestampLTZNanosToInstant(v: TimestampLTZNanos): Instant}}
*Normalization invariant:* {{nanosWithinMicro}} always in \[0, 999\]; use
{{Math.addExact}} for carry from sub-micro remainder into {{epochMicros}}.
*Precision:* helpers produce full nanosecond resolution; truncation to declared
type precision *p* ∈ \[7, 9\] is applied at cast/schema boundaries (future cast
ticket), not silently in the base conversion unless documented otherwise.
h3. 2. CatalystTypeConverters
Register converters for nanos logical types (mirror {{TimestampNTZConverter}} /
{{InstantConverter}}):
* {{TimestampNTZNanosType(_)}} + {{LocalDateTime}} (NTZ wall-clock)
* {{TimestampLTZNanosType(_)}} + {{Instant}} when
{{spark.sql.datetime.java8API.enabled=true}} (LTZ instant timeline)
Wire into {{createToCatalystConverter}}, {{createToScalaConverter}}, and
{{convertToCatalyst}} special cases for {{LocalDateTime}} / {{Instant}} when
the target schema column is a nanos timestamp type.
h3. 3. Encoder / deserializer plumbing
Ensure {{ExpressionEncoder}} over a {{StructType}} with
{{TimestampNTZNanosType(p)}} / {{TimestampLTZNanosType(p)}} columns can
serialize and deserialize {{Row}} values holding {{LocalDateTime}} /
{{Instant}}:
* {{SerializerBuildHelper}} / {{DeserializerBuildHelper}} as needed (follow
{{LocalDateTimeEncoder}} / {{InstantEncoder}} patterns for micro types)
* {{Row}} / {{GenericInternalRow}} paths use {{getTimestampNTZNanos}} /
{{setTimestampNTZNanos}} (physical accessors from SPARK-56981)
h3. 4. End-to-end Dataset tests (required)
Add integration tests that *create Datasets* from {{java.time}} values with
*nanosecond* fractional parts and {{collectAsList()}} / {{collect()}} back with
*exact* nanosecond equality:
* *Scala* ({{sql/core}}): {{Dataset[Row]}} or case-class rows with schema
{{TimestampNTZNanosType(9)}} / {{TimestampLTZNanosType(9)}}; values like
{{LocalDateTime.of(2019, 2, 26, 16, 56, 0, 123456789)}} and
{{Instant.parse("2019-02-26T16:56:00.123456789Z")}}
* *Java* ({{JavaDatasetSuite}} or new suite): same roundtrip via
{{spark.createDataset(..., encoder)}}
* Assert sub-micro digits preserved (not truncated to micros like today's
{{TimestampNTZType}} path)
* Include null column, multiple rows, and at least one edge instant from the
datetime corpus (epoch, pre-1900, max range)
*Unit tests* (catalyst, faster feedback):
* {{CatalystTypeConvertersSuite}} -- {{LocalDateTime}} / {{Instant}} ↔
{{TimestampNTZNanos}} / {{TimestampLTZNanos}} roundtrip
* {{RowEncoderSuite}} -- encode/decode rows with nanos timestamp columns
(mirror existing SPARK-35664 micro tests)
h2. Implementation notes
* Reuse {{LocalDateTime.getNano()}} / {{Instant.getNano()}} decomposition:
whole seconds → {{epochMicros}} grid, nano-of-second → {{nanosWithinMicro}} +
micro carry.
* Do *not* route through micro {{Long}} internally (that would lose sub-micro
digits); convert directly between {{java.time}} and {{TimestampNTZNanos}} /
{{TimestampLTZNanos}}.
* Existing {{TimestampNTZType}} / {{TimestampType}} converter behavior must
remain unchanged.
* Follow {{TimeType}} precedent: nanosecond precision preserved in catalyst for
{{LocalTime}}; same expectation for nanos timestamps.
h2. Acceptance criteria
* Conversion helpers roundtrip {{java.time}} ↔ composite values without
sub-micro loss for valid instants in supported range.
* {{CatalystTypeConvertersSuite}} and {{RowEncoderSuite}} pass with new
nanosecond cases.
* End-to-end Dataset tests (Scala + Java) create from {{LocalDateTime}} /
{{Instant}} with nanosecond fractions and collect back with {{equals}} match on
full nanosecond field.
* No regression in existing micro timestamp encoder/converter tests.
> Add java.time LocalDateTime/Instant conversion and Dataset roundtrip for
> nanosecond timestamps
> ----------------------------------------------------------------------------------------------
>
> Key: SPARK-57033
> URL: https://issues.apache.org/jira/browse/SPARK-57033
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Affects Versions: 4.3.0
> Reporter: Max Gekk
> Priority: Major
>
> h2. Summary
> Add conversion between {{java.time.LocalDateTime}} / {{java.time.Instant}}
> and the SPIP composite internal representation {{(epochMicros,
> nanosWithinMicro)}}, and wire Spark's encoder/converter stack so *Dataset*
> create/collect roundtrips preserve *nanosecond* precision end-to-end.
> h2. Motivation
> Spark's encoders today truncate sub-microsecond fractional digits when a
> java.time value flows through a Dataset. With the new TimestampNTZNanosType /
> TimestampLTZNanosType
> (SPARK-56981), the internal row can carry nanos, but no converter exists
> between Java's external types and that representation. As a result:
> {code:scala}
> val ts = LocalDateTime.parse("2025-01-01T00:00:00.123456789")
> val ds = spark.createDataset(Seq(ts))(/* nanos NTZ encoder */)
> ds.collect().head // -> 2025-01-01T00:00:00.123456000 (last 3 digits lost)
> {code}
> This ticket adds the conversion layer so that create → internal row →
> collect is lossless for nanosecond inputs.
> h2. Background
> Microsecond timestamps already roundtrip through:
> * {{CatalystTypeConverters}} -- {{Instant}} ↔ micro {{Long}}
> ({{TimestampType}}), {{LocalDateTime}} ↔ micro {{Long}} ({{TimestampNTZType}})
> * {{ExpressionEncoder}} / {{RowEncoderSuite}} -- encode-decode tests for
> {{Instant}} and {{LocalDateTime}}
> * {{Encoders.INSTANT()}}, {{Encoders.LOCALDATETIME()}},
> {{JavaDatasetSuite.testLocalDateTimeEncoder}} -- Dataset create/collect
> Nanosecond-capable types store values as {{TimestampNTZNanos}} /
> {{TimestampLTZNanos}} (composite layout from
> [SPARK-56981|https://github.com/apache/spark/pull/56059]). There is *no*
> {{CatalystTypeConverters}} path yet for {{TimestampNTZNanosType(p)}} /
> {{TimestampLTZNanosType(p)}}, and no end-to-end Dataset test proving
> sub-micro precision survives create → internal row → collect.
> h2. Scope
> h3. 1. Composite conversion helpers
> Add package-private helpers (extend {{SparkDateTimeUtils}} /
> {{DateTimeUtils}} or a small dedicated module), e.g.:
> * {{localDateTimeToTimestampNTZNanos(ldt: LocalDateTime): TimestampNTZNanos}}
> * {{timestampNTZNanosToLocalDateTime(v: TimestampNTZNanos): LocalDateTime}}
> * {{instantToTimestampLTZNanos(instant: Instant): TimestampLTZNanos}}
> (absolute instant → SPIP pair; LTZ semantics via zone rules where applicable)
> * {{timestampLTZNanosToInstant(v: TimestampLTZNanos): Instant}}
> *Normalization invariant:* {{nanosWithinMicro}} always in \[0, 999\]; use
> {{Math.addExact}} for carry from sub-micro remainder into {{epochMicros}}.
> *Precision:* helpers produce full nanosecond resolution; truncation to
> declared type precision *p* ∈ \[7, 9\] is applied at cast/schema boundaries
> (future cast ticket), not silently in the base conversion unless documented
> otherwise.
> h3. 2. CatalystTypeConverters
> Register converters for nanos logical types (mirror {{TimestampNTZConverter}}
> / {{InstantConverter}}):
> * {{TimestampNTZNanosType(_)}} + {{LocalDateTime}} (NTZ wall-clock)
> * {{TimestampLTZNanosType(_)}} + {{Instant}} when
> {{spark.sql.datetime.java8API.enabled=true}} (LTZ instant timeline)
> Wire into {{createToCatalystConverter}}, {{createToScalaConverter}}, and
> {{convertToCatalyst}} special cases for {{LocalDateTime}} / {{Instant}} when
> the target schema column is a nanos timestamp type.
> h3. 3. Encoder / deserializer plumbing
> Ensure {{ExpressionEncoder}} over a {{StructType}} with
> {{TimestampNTZNanosType(p)}} / {{TimestampLTZNanosType(p)}} columns can
> serialize and deserialize {{Row}} values holding {{LocalDateTime}} /
> {{Instant}}:
> * {{SerializerBuildHelper}} / {{DeserializerBuildHelper}} as needed (follow
> {{LocalDateTimeEncoder}} / {{InstantEncoder}} patterns for micro types)
> * {{Row}} / {{GenericInternalRow}} paths use {{getTimestampNTZNanos}} /
> {{setTimestampNTZNanos}} (physical accessors from SPARK-56981)
> h3. 4. End-to-end Dataset tests (required)
> Add integration tests that *create Datasets* from {{java.time}} values with
> *nanosecond* fractional parts and {{collectAsList()}} / {{collect()}} back
> with *exact* nanosecond equality:
> * *Scala* ({{sql/core}}): {{Dataset[Row]}} or case-class rows with schema
> {{TimestampNTZNanosType(9)}} / {{TimestampLTZNanosType(9)}}; values like
> {{LocalDateTime.of(2019, 2, 26, 16, 56, 0, 123456789)}} and
> {{Instant.parse("2019-02-26T16:56:00.123456789Z")}}
> * *Java* ({{JavaDatasetSuite}} or new suite): same roundtrip via
> {{spark.createDataset(..., encoder)}}
> * Assert sub-micro digits preserved (not truncated to micros like today's
> {{TimestampNTZType}} path)
> * Include null column, multiple rows, and at least one edge instant from the
> datetime corpus (epoch, pre-1900, max range)
> *Unit tests* (catalyst, faster feedback):
> * {{CatalystTypeConvertersSuite}} -- {{LocalDateTime}} / {{Instant}} ↔
> {{TimestampNTZNanos}} / {{TimestampLTZNanos}} roundtrip
> * {{RowEncoderSuite}} -- encode/decode rows with nanos timestamp columns
> (mirror existing SPARK-35664 micro tests)
> h2. Implementation notes
> * Reuse {{LocalDateTime.getNano()}} / {{Instant.getNano()}} decomposition:
> whole seconds → {{epochMicros}} grid, nano-of-second → {{nanosWithinMicro}} +
> micro carry.
> * Do *not* route through micro {{Long}} internally (that would lose sub-micro
> digits); convert directly between {{java.time}} and {{TimestampNTZNanos}} /
> {{TimestampLTZNanos}}.
> * Existing {{TimestampNTZType}} / {{TimestampType}} converter behavior must
> remain unchanged.
> * Follow {{TimeType}} precedent: nanosecond precision preserved in catalyst
> for {{LocalTime}}; same expectation for nanos timestamps.
> h2. Acceptance criteria
> * Roundtrip property tests in DateTimeUtilsSuite covering Instant.MIN,
> Instant.MAX, epoch, negative epoch, and random instants for each n ∈ [7, 9].
> * CatalystTypeConvertersSuite confirms the new converters are picked up for
> the nanos logical types.
> * ExpressionEncoderSuite tests for Dataset[LocalDateTime] and
> Dataset[Instant] showing create → collect preserves nanos.
> * Existing micro-precision encoder/converter tests are unchanged (no
> regressions).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]