This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a7f3b5f2db08 [SPARK-52460][SQL] Store internal TIME values as nanoseconds a7f3b5f2db08 is described below commit a7f3b5f2db087ed8d7b5f35c5c8111d528d24da3 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Sat Jun 14 23:23:57 2025 +0300 [SPARK-52460][SQL] Store internal TIME values as nanoseconds ### What changes were proposed in this pull request? In the PR, I propose to store internal TIME values as nanoseconds in `Long` since the midnight instead of microseconds in `Long`. ### Why are the changes needed? `Long` with nanoseconds precision can keep the full range of TIME values from 0 to 24 * 60 * 60 * 1000 * 1000 * 1000 - 1 which is less than maximum of Long. This will simplify support of `TIME(9)`, for example. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" $ build/sbt "test:testOnly *TimeExpressionsSuite" $ build/sbt "test:testOnly *TimeFormatterSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51156 from MaxGekk/time-nanos. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../sql/catalyst/util/SparkDateTimeUtils.scala | 27 ++++++---- .../spark/sql/catalyst/util/TimeFormatter.scala | 12 ++--- .../sql/catalyst/CatalystTypeConverters.scala | 6 +-- .../sql/catalyst/DeserializerBuildHelper.scala | 2 +- .../spark/sql/catalyst/SerializerBuildHelper.scala | 2 +- .../spark/sql/catalyst/expressions/literals.scala | 4 +- .../spark/sql/catalyst/util/DateTimeUtils.scala | 18 +++---- .../org/apache/spark/sql/RandomDataGenerator.scala | 2 +- .../sql/catalyst/CatalystTypeConvertersSuite.scala | 4 +- .../sql/catalyst/encoders/RowEncoderSuite.scala | 2 +- .../expressions/HashExpressionsSuite.scala | 6 +-- .../catalyst/expressions/LiteralGenerator.scala | 4 +- .../catalyst/expressions/ToPrettyStringSuite.scala | 6 +-- .../sql/catalyst/util/DateTimeTestUtils.scala | 6 +-- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 24 ++++----- .../sql/catalyst/util/TimeFormatterSuite.scala | 57 ++++++++++++---------- .../parquet/ParquetVectorUpdaterFactory.java | 44 ++++++++++++++++- .../parquet/VectorizedColumnReader.java | 3 +- .../datasources/parquet/ParquetFilters.scala | 23 ++++----- .../datasources/parquet/ParquetRowConverter.scala | 3 +- .../datasources/parquet/ParquetWriteSupport.scala | 6 ++- .../datasources/parquet/ParquetIOSuite.scala | 4 +- 22 files changed, 165 insertions(+), 100 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index 6a51799e1132..b88600b8b8af 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util import java.lang.invoke.{MethodHandles, MethodType} import java.sql.{Date, Timestamp} import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZonedDateTime, ZoneId, ZoneOffset} -import java.time.temporal.ChronoField.MICRO_OF_DAY +import java.time.temporal.ChronoField.{MICRO_OF_DAY, NANO_OF_DAY} import java.util.TimeZone import java.util.concurrent.TimeUnit.{MICROSECONDS, NANOSECONDS} import java.util.regex.Pattern @@ -83,6 +83,12 @@ trait SparkDateTimeUtils { case ldt: LocalDateTime => localDateTimeToMicros(ldt) } + /** + * Converts the time to microseconds since midnight. In Spark time values have nanoseconds + * precision, so this conversion is lossy. + */ + def nanosToMicros(nanos: Long): Long = Math.floorDiv(nanos, MICROS_PER_MILLIS) + /** * Converts the timestamp to milliseconds since epoch. In Spark timestamp values have * microseconds precision, so this conversion is lossy. @@ -101,6 +107,11 @@ trait SparkDateTimeUtils { Math.multiplyExact(millis, MICROS_PER_MILLIS) } + /** + * Converts microseconds since the midnight to nanoseconds. + */ + def microsToNanos(micros: Long): Long = Math.multiplyExact(micros, NANOS_PER_MICROS) + // See issue SPARK-35679 // min second cause overflow in instant to micro private val MIN_SECONDS = Math.floorDiv(Long.MinValue, MICROS_PER_SECOND) @@ -225,17 +236,15 @@ trait SparkDateTimeUtils { } /** - * Converts the local time to the number of microseconds within the day, from 0 to (24 * 60 * 60 - * * 1000000) - 1. + * Converts the local time to the number of nanoseconds within the day, from 0 to (24 * 60 * 60 + * * 1000 * 1000 * 1000) - 1. */ - def localTimeToMicros(localTime: LocalTime): Long = localTime.getLong(MICRO_OF_DAY) + def localTimeToNanos(localTime: LocalTime): Long = localTime.getLong(NANO_OF_DAY) /** - * Converts the number of microseconds within the day to the local time. + * Converts the number of nanoseconds within the day to the local time. */ - def microsToLocalTime(micros: Long): LocalTime = { - LocalTime.ofNanoOfDay(Math.multiplyExact(micros, NANOS_PER_MICROS)) - } + def nanosToLocalTime(nanos: Long): LocalTime = LocalTime.ofNanoOfDay(nanos) /** * Converts a local date at the default JVM time zone to the number of days since 1970-01-01 in @@ -716,7 +725,7 @@ trait SparkDateTimeUtils { } val nanoseconds = MICROSECONDS.toNanos(segments(6)) val localTime = LocalTime.of(segments(3), segments(4), segments(5), nanoseconds.toInt) - Some(localTimeToMicros(localTime)) + Some(localTimeToNanos(localTime)) } catch { case NonFatal(_) => None } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala index 46afbc8aca19..a159f74572f1 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala @@ -25,11 +25,11 @@ import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._ import org.apache.spark.unsafe.types.UTF8String sealed trait TimeFormatter extends Serializable { - def parse(s: String): Long // returns microseconds since midnight + def parse(s: String): Long // returns nanoseconds since midnight def format(localTime: LocalTime): String - // Converts microseconds since the midnight to time string - def format(micros: Long): String + // Converts nanoseconds since the midnight to time string + def format(nanos: Long): String def validatePatternString(): Unit } @@ -47,15 +47,15 @@ class Iso8601TimeFormatter(pattern: String, locale: Locale, isParsing: Boolean) override def parse(s: String): Long = { val localTime = toLocalTime(formatter.parse(s)) - localTimeToMicros(localTime) + localTimeToNanos(localTime) } override def format(localTime: LocalTime): String = { localTime.format(formatter) } - override def format(micros: Long): String = { - format(microsToLocalTime(micros)) + override def format(nanos: Long): String = { + format(nanosToLocalTime(nanos)) } override def validatePatternString(): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index bb6afb3b13fa..440cfb713242 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -375,14 +375,14 @@ object CatalystTypeConverters { private object TimeConverter extends CatalystTypeConverter[LocalTime, LocalTime, Any] { override def toCatalystImpl(scalaValue: LocalTime): Long = { - DateTimeUtils.localTimeToMicros(scalaValue) + DateTimeUtils.localTimeToNanos(scalaValue) } override def toScala(catalystValue: Any): LocalTime = { if (catalystValue == null) null - else DateTimeUtils.microsToLocalTime(catalystValue.asInstanceOf[Long]) + else DateTimeUtils.nanosToLocalTime(catalystValue.asInstanceOf[Long]) } override def toScalaImpl(row: InternalRow, column: Int): LocalTime = - DateTimeUtils.microsToLocalTime(row.getLong(column)) + DateTimeUtils.nanosToLocalTime(row.getLong(column)) } private object TimestampConverter extends CatalystTypeConverter[Any, Timestamp, Any] { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala index 9b22f28ed12d..15de70e35a45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala @@ -160,7 +160,7 @@ object DeserializerBuildHelper { StaticInvoke( DateTimeUtils.getClass, ObjectType(classOf[java.time.LocalTime]), - "microsToLocalTime", + "nanosToLocalTime", path :: Nil, returnNullable = false) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala index c8bf1f523799..82b3cdc508bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala @@ -103,7 +103,7 @@ object SerializerBuildHelper { StaticInvoke( DateTimeUtils.getClass, TimeType(), - "localTimeToMicros", + "localTimeToNanos", inputObject :: Nil, returnNullable = false) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index e3ed2c4a0b0b..925735654b73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern import org.apache.spark.sql.catalyst.trees.TreePattern.{LITERAL, NULL_LITERAL, TRUE_OR_FALSE_LITERAL} import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localTimeToMicros} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localTimeToNanos} import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE import org.apache.spark.sql.catalyst.util.IntervalUtils.{durationToMicros, periodToMonths, toDayTimeIntervalString, toYearMonthIntervalString} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -89,7 +89,7 @@ object Literal { case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampNTZType) case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType) case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) - case lt: LocalTime => Literal(localTimeToMicros(lt), TimeType()) + case lt: LocalTime => Literal(localTimeToNanos(lt), TimeType()) case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType()) case p: Period => Literal(periodToMonths(p), YearMonthIntervalType()) case a: Array[Byte] => Literal(a, BinaryType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index d7cbb9886ba1..cd811bc9749f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -109,7 +109,7 @@ object DateTimeUtils extends SparkDateTimeUtils { * Returns the hour value of a given TIME (TimeType) value. */ def getHoursOfTime(micros: Long): Int = { - microsToLocalTime(micros).getHour + nanosToLocalTime(micros).getHour } /** @@ -124,7 +124,7 @@ object DateTimeUtils extends SparkDateTimeUtils { * Returns the minute value of a given TIME (TimeType) value. */ def getMinutesOfTime(micros: Long): Int = { - microsToLocalTime(micros).getMinute + nanosToLocalTime(micros).getMinute } /** @@ -139,7 +139,7 @@ object DateTimeUtils extends SparkDateTimeUtils { * Returns the second value of a given TIME (TimeType) value. */ def getSecondsOfTime(micros: Long): Int = { - microsToLocalTime(micros).getSecond + nanosToLocalTime(micros).getSecond } /** * Returns the seconds part and its fractional part with microseconds. @@ -151,16 +151,16 @@ object DateTimeUtils extends SparkDateTimeUtils { /** * Returns the second value with fraction from a given TIME (TimeType) value. - * @param micros - * The number of microseconds since the epoch. + * @param nanos + * The number of nanoseconds since the epoch. * @param precision * The time fractional seconds precision, which indicates the number of decimal digits * maintained. */ - def getSecondsOfTimeWithFraction(micros: Long, precision: Int): Decimal = { - val seconds = (micros / MICROS_PER_SECOND) % SECONDS_PER_MINUTE + def getSecondsOfTimeWithFraction(nanos: Long, precision: Int): Decimal = { + val seconds = (nanos / NANOS_PER_SECOND) % SECONDS_PER_MINUTE val scaleFactor = math.pow(10, precision).toLong - val scaledFraction = (micros % MICROS_PER_SECOND) * scaleFactor / MICROS_PER_SECOND + val scaledFraction = (nanos % NANOS_PER_SECOND) * scaleFactor / NANOS_PER_SECOND val fraction = scaledFraction.toDouble / scaleFactor Decimal(seconds + fraction, 8, 6) } @@ -816,7 +816,7 @@ object DateTimeUtils extends SparkDateTimeUtils { val nanos = Math.floorMod(unscaledSecFrac, MICROS_PER_SECOND) * NANOS_PER_MICROS val lt = LocalTime.of(hours, minutes, fullSecs.toInt, nanos.toInt) - localTimeToMicros(lt) + localTimeToNanos(lt) } catch { case e @ (_: DateTimeException | _: ArithmeticException) => throw QueryExecutionErrors.ansiDateTimeArgumentOutOfRangeWithoutSuggestion(e) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index 2f92fe3d083d..de916d13b1ba 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -292,7 +292,7 @@ object RandomDataGenerator { randomNumeric[LocalTime]( rand, (rand: Random) => { - DateTimeUtils.microsToLocalTime(rand.between(0, 24 * 60 * 60 * 1000 * 1000L)) + DateTimeUtils.nanosToLocalTime(rand.between(0, 24 * 60 * 60 * 1000 * 1000L)) }, specialTimes.map(LocalTime.parse) ) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index e4c48f7467f9..00d8bd6633a9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -435,7 +435,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { "23:59:59.999999").foreach { time => val input = LocalTime.parse(time) val result = CatalystTypeConverters.convertToCatalyst(input) - val expected = DateTimeUtils.localTimeToMicros(input) + val expected = DateTimeUtils.localTimeToNanos(input) assert(result === expected) } } @@ -449,7 +449,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { 43200999999L, 86399000000L, 86399999999L).foreach { us => - val localTime = DateTimeUtils.microsToLocalTime(us) + val localTime = DateTimeUtils.nanosToLocalTime(us) assert(CatalystTypeConverters.createToScalaConverter(TimeType())(us) === localTime) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 1609e1a4e113..05ccaf2cbda0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -377,7 +377,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { val encoder = ExpressionEncoder(schema).resolveAndBind() val localTime = java.time.LocalTime.parse("20:38:45.123456") val row = toRow(encoder, Row(localTime)) - assert(row.getLong(0) === DateTimeUtils.localTimeToMicros(localTime)) + assert(row.getLong(0) === DateTimeUtils.localTimeToNanos(localTime)) val readback = fromRow(encoder, row) assert(readback.get(0).equals(localTime)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index dddc33aa4358..c64b94703288 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -756,9 +756,9 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("Support TimeType") { val time = Literal.create(LocalTime.of(23, 50, 59, 123456000), TimeType()) - checkEvaluation(Murmur3Hash(Seq(time), 10), 258472763) - checkEvaluation(XxHash64(Seq(time), 10), -9197489935839400467L) - checkEvaluation(HiveHash(Seq(time)), -40222445) + checkEvaluation(Murmur3Hash(Seq(time), 10), 545499634) + checkEvaluation(XxHash64(Seq(time), 10), -3550518982366774761L) + checkEvaluation(HiveHash(Seq(time)), -1567775210) } private def testHash(inputSchema: StructType): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala index ed5843478c00..bada54135653 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala @@ -125,8 +125,8 @@ object LiteralGenerator { lazy val timeLiteralGen: Gen[Literal] = { // Valid range for TimeType is [00:00:00, 23:59:59.999999] - val minTime = DateTimeUtils.localTimeToMicros(LocalTime.MIN) - val maxTime = DateTimeUtils.localTimeToMicros(LocalTime.MAX) + val minTime = DateTimeUtils.localTimeToNanos(LocalTime.MIN) + val maxTime = DateTimeUtils.localTimeToNanos(LocalTime.MAX) for { t <- Gen.choose(minTime, maxTime) } yield Literal(t, TimeType()) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala index 5c297c00acc0..6a2651edd9ab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala @@ -136,9 +136,9 @@ class ToPrettyStringSuite extends SparkFunSuite with ExpressionEvalHelper { } test("Time as pretty strings") { - checkEvaluation(ToPrettyString(Literal(1000L, TimeType())), "00:00:00.001") - checkEvaluation(ToPrettyString(Literal(1L, TimeType())), "00:00:00.000001") + checkEvaluation(ToPrettyString(Literal(1000 * 1000L, TimeType())), "00:00:00.001") + checkEvaluation(ToPrettyString(Literal(1000L, TimeType())), "00:00:00.000001") checkEvaluation(ToPrettyString(Literal( - (23 * 3600 + 59 * 60 + 59) * 1000000L, TimeType())), "23:59:59") + (23 * 3600 + 59 * 60 + 59) * 1000000000L, TimeType())), "23:59:59") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala index f281c42bbe71..b17a22778801 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.util.DateTimeConstants._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, localTimeToMicros} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, localTimeToNanos} /** * Helper functions for testing date and time functionality. @@ -113,7 +113,7 @@ object DateTimeTestUtils { result } - // Returns microseconds since midnight + // Returns nanoseconds since midnight def localTime( hour: Byte = 0, minute: Byte = 0, @@ -121,6 +121,6 @@ object DateTimeTestUtils { micros: Int = 0): Long = { val nanos = TimeUnit.MICROSECONDS.toNanos(micros).toInt val localTime = LocalTime.of(hour, minute, sec, nanos) - localTimeToMicros(localTime) + localTimeToNanos(localTime) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 24258a2268ba..0307b6d944fb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -1107,25 +1107,27 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { "invalidValue" -> "'SECS'")) } - test("localTimeToMicros and microsToLocalTime") { - assert(microsToLocalTime(0) === LocalTime.of(0, 0)) - assert(localTimeToMicros(LocalTime.of(0, 0)) === 0) + test("localTimeToNanos and nanosToLocalTime") { + assert(nanosToLocalTime(0) === LocalTime.of(0, 0)) + assert(localTimeToNanos(LocalTime.of(0, 0)) === 0) - assert(localTimeToMicros(microsToLocalTime(123456789)) === 123456789) + assert(localTimeToNanos(nanosToLocalTime(123456789123L)) === 123456789123L) - assert(localTimeToMicros(LocalTime.parse("23:59:59.999999")) === (24L * 60 * 60 * 1000000 - 1)) - assert(microsToLocalTime(24L * 60 * 60 * 1000000 - 1) === LocalTime.of(23, 59, 59, 999999000)) + assert(localTimeToNanos(LocalTime.parse("23:59:59.999999999")) === + (24L * 60 * 60 * 1000 * 1000 * 1000 - 1)) + assert(nanosToLocalTime(24L * 60 * 60 * 1000 * 1000 * 1000 - 1) === + LocalTime.of(23, 59, 59, 999999999)) - Seq(-1, 24L * 60 * 60 * 1000000).foreach { invalidMicros => + Seq(-1, 24L * 60 * 60 * 1000 * 1000 * 1000L).foreach { invalidMicros => val msg = intercept[DateTimeException] { - microsToLocalTime(invalidMicros) + nanosToLocalTime(invalidMicros) }.getMessage assert(msg.contains("Invalid value")) } - val msg = intercept[ArithmeticException] { - microsToLocalTime(Long.MaxValue) + val msg = intercept[DateTimeException] { + nanosToLocalTime(Long.MaxValue) }.getMessage - assert(msg == "long overflow") + assert(msg.contains("Invalid value")) } test("stringToTime") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala index d99ea2bd1042..9a707d80d248 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala @@ -24,22 +24,24 @@ import scala.util.Random import org.apache.spark.{SPARK_DOC_ROOT, SparkDateTimeException, SparkFunSuite, SparkRuntimeException} import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToLocalTime +import org.apache.spark.sql.catalyst.util.DateTimeUtils.nanosToLocalTime class TimeFormatterSuite extends SparkFunSuite with SQLHelper { test("time parsing") { Seq( - ("12", "HH") -> 12 * 3600 * 1000000L, - ("01:02", "HH:mm") -> (1 * 3600 + 2 * 60) * 1000000L, - ("10:20", "HH:mm") -> (10 * 3600 + 20 * 60) * 1000000L, + ("12", "HH") -> 12 * 3600 * 1000000000L, + ("01:02", "HH:mm") -> (1 * 3600 + 2 * 60) * 1000000000L, + ("10:20", "HH:mm") -> (10 * 3600 + 20 * 60) * 1000000000L, ("00:00:00", "HH:mm:ss") -> 0L, - ("01:02:03", "HH:mm:ss") -> (1 * 3600 + 2 * 60 + 3) * 1000000L, - ("23:59:59", "HH:mm:ss") -> (23 * 3600 + 59 * 60 + 59) * 1000000L, + ("01:02:03", "HH:mm:ss") -> (1 * 3600 + 2 * 60 + 3) * 1000000000L, + ("23:59:59", "HH:mm:ss") -> (23 * 3600 + 59 * 60 + 59) * 1000000000L, ("00:00:00.000000", "HH:mm:ss.SSSSSS") -> 0L, - ("12:34:56.789012", "HH:mm:ss.SSSSSS") -> ((12 * 3600 + 34 * 60 + 56) * 1000000L + 789012), - ("23:59:59.000000", "HH:mm:ss.SSSSSS") -> (23 * 3600 + 59 * 60 + 59) * 1000000L, - ("23:59:59.999999", "HH:mm:ss.SSSSSS") -> ((23 * 3600 + 59 * 60 + 59) * 1000000L + 999999) + ("12:34:56.789012", "HH:mm:ss.SSSSSS") -> + ((12 * 3600 + 34 * 60 + 56) * 1000000000L + 789012000), + ("23:59:59.000000", "HH:mm:ss.SSSSSS") -> (23 * 3600 + 59 * 60 + 59) * 1000000000L, + ("23:59:59.999999", "HH:mm:ss.SSSSSS") -> + ((23 * 3600 + 59 * 60 + 59) * 1000000000L + 999999000) ).foreach { case ((inputStr, pattern), expectedMicros) => val formatter = TimeFormatter(format = pattern, isParsing = true) assert(formatter.parse(inputStr) === expectedMicros) @@ -60,16 +62,18 @@ class TimeFormatterSuite extends SparkFunSuite with SQLHelper { test("time formatting") { Seq( - (12 * 3600 * 1000000L, "HH") -> "12", - ((1 * 3600 + 2 * 60) * 1000000L, "HH:mm") -> "01:02", - ((10 * 3600 + 20 * 60) * 1000000L, "HH:mm") -> "10:20", + (12 * 3600 * 1000000000L, "HH") -> "12", + ((1 * 3600 + 2 * 60) * 1000000000L, "HH:mm") -> "01:02", + ((10 * 3600 + 20 * 60) * 1000000000L, "HH:mm") -> "10:20", (0L, "HH:mm:ss") -> "00:00:00", - ((1 * 3600 + 2 * 60 + 3) * 1000000L, "HH:mm:ss") -> "01:02:03", - ((23 * 3600 + 59 * 60 + 59) * 1000000L, "HH:mm:ss") -> "23:59:59", + ((1 * 3600 + 2 * 60 + 3) * 1000000000L, "HH:mm:ss") -> "01:02:03", + ((23 * 3600 + 59 * 60 + 59) * 1000000000L, "HH:mm:ss") -> "23:59:59", (0L, "HH:mm:ss.SSSSSS") -> "00:00:00.000000", - ((12 * 3600 + 34 * 60 + 56) * 1000000L + 789012, "HH:mm:ss.SSSSSS") -> "12:34:56.789012", - ((23 * 3600 + 59 * 60 + 59) * 1000000L, "HH:mm:ss.SSSSSS") -> "23:59:59.000000", - ((23 * 3600 + 59 * 60 + 59) * 1000000L + 999999, "HH:mm:ss.SSSSSS") -> "23:59:59.999999" + ((12 * 3600 + 34 * 60 + 56) * 1000000000L + 789012000, "HH:mm:ss.SSSSSS") -> + "12:34:56.789012", + ((23 * 3600 + 59 * 60 + 59) * 1000000000L, "HH:mm:ss.SSSSSS") -> "23:59:59.000000", + ((23 * 3600 + 59 * 60 + 59) * 1000000000L + 999999000, "HH:mm:ss.SSSSSS") -> + "23:59:59.999999" ).foreach { case ((micros, pattern), expectedStr) => val formatter = TimeFormatter(format = pattern) assert(formatter.format(micros) === expectedStr) @@ -82,8 +86,8 @@ class TimeFormatterSuite extends SparkFunSuite with SQLHelper { assert(e.getMessage.contains(expectedMsg)) } - assertError(-1, "Invalid value for NanoOfDay (valid values 0 - 86399999999999): -1000") - assertError(25L * 3600 * 1000 * 1000, + assertError(-1000, "Invalid value for NanoOfDay (valid values 0 - 86399999999999): -1000") + assertError(25L * 3600 * 1000 * 1000 * 1000, "Invalid value for NanoOfDay (valid values 0 - 86399999999999): 90000000000000") } @@ -101,14 +105,14 @@ class TimeFormatterSuite extends SparkFunSuite with SQLHelper { } test("round trip with the default pattern: format -> parse") { - val data = Seq.tabulate(10) { _ => Random.between(0, 24 * 60 * 60 * 1000000L) } + val data = Seq.tabulate(10) { _ => Random.between(0, 24 * 60 * 60 * 1000000L) * 1000L } val pattern = "HH:mm:ss.SSSSSS" val (formatter, parser) = (TimeFormatter(pattern, isParsing = false), TimeFormatter(pattern, isParsing = true)) - data.foreach { micros => - val str = formatter.format(micros) - assert(parser.parse(str) === micros, s"micros = $micros") - assert(formatter.format(microsToLocalTime(micros)) === str) + data.foreach { nanos => + val str = formatter.format(nanos) + assert(parser.parse(str) === nanos, s"nanos = $nanos") + assert(formatter.format(nanosToLocalTime(nanos)) === str) } } @@ -120,8 +124,9 @@ class TimeFormatterSuite extends SparkFunSuite with SQLHelper { 1000 -> "00:00:00.001", 900000 -> "00:00:00.9", 1000000 -> "00:00:01").foreach { case (micros, tsStr) => - assert(formatter.format(micros) === tsStr) - assert(formatter.format(microsToLocalTime(micros)) === tsStr) + val nanos = micros * 1000L + assert(formatter.format(nanos) === tsStr) + assert(formatter.format(nanosToLocalTime(nanos)) === tsStr) } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 889f11e11973..eb6c84b8113b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -24,6 +24,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -159,7 +160,7 @@ public class ParquetVectorUpdaterFactory { } else if (canReadAsDecimal(descriptor, sparkType)) { return new LongToDecimalUpdater(descriptor, (DecimalType) sparkType); } else if (sparkType instanceof TimeType) { - return new LongUpdater(); + return new LongAsNanosUpdater(); } } case FLOAT -> { @@ -233,6 +234,11 @@ public class ParquetVectorUpdaterFactory { annotation.getUnit() == unit; } + boolean isTimeTypeMatched(LogicalTypeAnnotation.TimeUnit unit) { + return logicalTypeAnnotation instanceof TimeLogicalTypeAnnotation annotation && + annotation.getUnit() == unit; + } + boolean isUnsignedIntTypeMatched(int bitWidth) { return logicalTypeAnnotation instanceof IntLogicalTypeAnnotation annotation && !annotation.isSigned() && annotation.getBitWidth() == bitWidth; @@ -825,6 +831,42 @@ public class ParquetVectorUpdaterFactory { } } + private static class LongAsNanosUpdater implements ParquetVectorUpdater { + @Override + public void readValues( + int total, + int offset, + WritableColumnVector values, + VectorizedValuesReader valuesReader) { + for (int i = 0; i < total; ++i) { + readValue(offset + i, values, valuesReader); + } + } + + @Override + public void skipValues(int total, VectorizedValuesReader valuesReader) { + valuesReader.skipLongs(total); + } + + @Override + public void readValue( + int offset, + WritableColumnVector values, + VectorizedValuesReader valuesReader) { + values.putLong(offset, DateTimeUtils.microsToNanos(valuesReader.readLong())); + } + + @Override + public void decodeSingleDictionaryId( + int offset, + WritableColumnVector values, + WritableColumnVector dictionaryIds, + Dictionary dictionary) { + long micros = dictionary.decodeToLong(dictionaryIds.getDictId(offset)); + values.putLong(offset, DateTimeUtils.microsToNanos(micros)); + } + } + private static class FloatUpdater implements ParquetVectorUpdater { @Override public void readValues( diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 731c78cf9450..6e1660dc8c87 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -165,7 +165,8 @@ public class VectorizedColumnReader { case INT64: { boolean isDecimal = sparkType instanceof DecimalType; boolean needsUpcast = (isDecimal && !DecimalType.is64BitDecimalType(sparkType)) || - updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS); + updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS) || + updaterFactory.isTimeTypeMatched(TimeUnit.MICROS); boolean needsRebase = updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS) && !"CORRECTED".equals(datetimeRebaseMode); isSupported = !needsUpcast && !needsRebase && !needsDecimalScaleRebase(sparkType); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 565742671b9c..4a9b17bf98e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal} import java.nio.charset.StandardCharsets.UTF_8 import java.sql.{Date, Timestamp} import java.time.{Duration, Instant, LocalDate, LocalTime, Period} +import java.time.temporal.ChronoField.MICRO_OF_DAY import java.util.HashSet import java.util.Locale @@ -149,7 +150,7 @@ class ParquetFilters( ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MICROS), INT64, 0) private val ParquetTimestampMillisType = ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS), INT64, 0) - private val ParquetTimeType = + private val ParquetTimeMicrosType = ParquetSchemaType(LogicalTypeAnnotation.timeType(false, TimeUnit.MICROS), INT64, 0) private def dateToDays(date: Any): Int = { @@ -176,7 +177,7 @@ class ParquetFilters( } private def localTimeToMicros(v: Any): JLong = { - DateTimeUtils.localTimeToMicros(v.asInstanceOf[LocalTime]) + v.asInstanceOf[LocalTime].getLong(MICRO_OF_DAY) } private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() @@ -213,7 +214,7 @@ class ParquetFilters( private def toLongValue(v: Any): JLong = v match { case d: Duration => IntervalUtils.durationToMicros(d) - case lt: LocalTime => DateTimeUtils.localTimeToMicros(lt) + case lt: LocalTime => localTimeToMicros(lt) case l => l.asInstanceOf[JLong] } @@ -251,7 +252,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), Option(v).map(timestampToMillis).orNull) - case ParquetTimeType => + case ParquetTimeMicrosType => (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), Option(v).map(localTimeToMicros).orNull) @@ -304,7 +305,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), Option(v).map(timestampToMillis).orNull) - case ParquetTimeType => + case ParquetTimeMicrosType => (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), Option(v).map(localTimeToMicros).orNull) @@ -348,7 +349,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v)) - case ParquetTimeType => + case ParquetTimeMicrosType => (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), localTimeToMicros(v)) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => @@ -387,7 +388,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v)) - case ParquetTimeType => + case ParquetTimeMicrosType => (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), localTimeToMicros(v)) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => @@ -426,7 +427,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v)) - case ParquetTimeType => + case ParquetTimeMicrosType => (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), localTimeToMicros(v)) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => @@ -465,7 +466,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v)) - case ParquetTimeType => + case ParquetTimeMicrosType => (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), localTimeToMicros(v)) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => @@ -556,7 +557,7 @@ class ParquetFilters( } FilterApi.in(longColumn(n), set) - case ParquetTimeType => + case ParquetTimeMicrosType => (n: Array[String], values: Array[Any]) => val set = new HashSet[JLong]() for (value <- values) { @@ -661,7 +662,7 @@ class ParquetFilters( value.isInstanceOf[Date] || value.isInstanceOf[LocalDate] case ParquetTimestampMicrosType | ParquetTimestampMillisType => value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant] - case ParquetTimeType => value.isInstanceOf[LocalTime] + case ParquetTimeMicrosType => value.isInstanceOf[LocalTime] case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT32, _) => isDecimalMatched(value, decimalType) case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT64, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 0927f5c3c963..cb5e7bf53215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -487,7 +487,8 @@ private[parquet] class ParquetRowConverter( .asInstanceOf[TimeLogicalTypeAnnotation].getUnit == TimeUnit.MICROS => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { - this.updater.setLong(value) + val nanos = DateTimeUtils.microsToNanos(value) + this.updater.setLong(nanos) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 4022f7ea3003..9f1a815d7fa1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -209,7 +209,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getInt(ordinal)) - case LongType | _: DayTimeIntervalType | _: TimeType => + case LongType | _: DayTimeIntervalType => (row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal)) @@ -253,6 +253,10 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { // MICROS time unit. (row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal)) + case _: TimeType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addLong(DateTimeUtils.nanosToMicros(row.getLong(ordinal))) + case BinaryType => (row: SpecializedGetters, ordinal: Int) => recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index d78b4a426e70..f52b0bdd8790 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -41,8 +41,8 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} +import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.localTime -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1623,7 +1623,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val writer = createParquetWriter(schema, tablePath, dictionaryEnabled = dictEnabled) (0 until numRecords).foreach { i => val record = new SimpleGroup(schema) - record.add(0, localTime(23, 59, 59, 123456)) + record.add(0, localTime(23, 59, 59, 123456) / DateTimeConstants.NANOS_PER_MICROS) writer.write(record) } writer.close --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org