This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a7f3b5f2db08 [SPARK-52460][SQL] Store internal TIME values as 
nanoseconds
a7f3b5f2db08 is described below

commit a7f3b5f2db087ed8d7b5f35c5c8111d528d24da3
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Sat Jun 14 23:23:57 2025 +0300

    [SPARK-52460][SQL] Store internal TIME values as nanoseconds
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to store internal TIME values as nanoseconds in `Long` 
since the midnight instead of microseconds in `Long`.
    
    ### Why are the changes needed?
    `Long` with nanoseconds precision can keep the full range of TIME values 
from 0 to 24 * 60 * 60 * 1000 * 1000 * 1000 - 1 which is less than maximum of 
Long. This will simplify support of `TIME(9)`, for example.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
    $ build/sbt "test:testOnly *TimeExpressionsSuite"
    $ build/sbt "test:testOnly *TimeFormatterSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #51156 from MaxGekk/time-nanos.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../sql/catalyst/util/SparkDateTimeUtils.scala     | 27 ++++++----
 .../spark/sql/catalyst/util/TimeFormatter.scala    | 12 ++---
 .../sql/catalyst/CatalystTypeConverters.scala      |  6 +--
 .../sql/catalyst/DeserializerBuildHelper.scala     |  2 +-
 .../spark/sql/catalyst/SerializerBuildHelper.scala |  2 +-
 .../spark/sql/catalyst/expressions/literals.scala  |  4 +-
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 18 +++----
 .../org/apache/spark/sql/RandomDataGenerator.scala |  2 +-
 .../sql/catalyst/CatalystTypeConvertersSuite.scala |  4 +-
 .../sql/catalyst/encoders/RowEncoderSuite.scala    |  2 +-
 .../expressions/HashExpressionsSuite.scala         |  6 +--
 .../catalyst/expressions/LiteralGenerator.scala    |  4 +-
 .../catalyst/expressions/ToPrettyStringSuite.scala |  6 +--
 .../sql/catalyst/util/DateTimeTestUtils.scala      |  6 +--
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     | 24 ++++-----
 .../sql/catalyst/util/TimeFormatterSuite.scala     | 57 ++++++++++++----------
 .../parquet/ParquetVectorUpdaterFactory.java       | 44 ++++++++++++++++-
 .../parquet/VectorizedColumnReader.java            |  3 +-
 .../datasources/parquet/ParquetFilters.scala       | 23 ++++-----
 .../datasources/parquet/ParquetRowConverter.scala  |  3 +-
 .../datasources/parquet/ParquetWriteSupport.scala  |  6 ++-
 .../datasources/parquet/ParquetIOSuite.scala       |  4 +-
 22 files changed, 165 insertions(+), 100 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
index 6a51799e1132..b88600b8b8af 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util
 import java.lang.invoke.{MethodHandles, MethodType}
 import java.sql.{Date, Timestamp}
 import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZonedDateTime, 
ZoneId, ZoneOffset}
-import java.time.temporal.ChronoField.MICRO_OF_DAY
+import java.time.temporal.ChronoField.{MICRO_OF_DAY, NANO_OF_DAY}
 import java.util.TimeZone
 import java.util.concurrent.TimeUnit.{MICROSECONDS, NANOSECONDS}
 import java.util.regex.Pattern
@@ -83,6 +83,12 @@ trait SparkDateTimeUtils {
     case ldt: LocalDateTime => localDateTimeToMicros(ldt)
   }
 
+  /**
+   * Converts the time to microseconds since midnight. In Spark time values 
have nanoseconds
+   * precision, so this conversion is lossy.
+   */
+  def nanosToMicros(nanos: Long): Long = Math.floorDiv(nanos, 
MICROS_PER_MILLIS)
+
   /**
    * Converts the timestamp to milliseconds since epoch. In Spark timestamp 
values have
    * microseconds precision, so this conversion is lossy.
@@ -101,6 +107,11 @@ trait SparkDateTimeUtils {
     Math.multiplyExact(millis, MICROS_PER_MILLIS)
   }
 
+  /**
+   * Converts microseconds since the midnight to nanoseconds.
+   */
+  def microsToNanos(micros: Long): Long = Math.multiplyExact(micros, 
NANOS_PER_MICROS)
+
   // See issue SPARK-35679
   // min second cause overflow in instant to micro
   private val MIN_SECONDS = Math.floorDiv(Long.MinValue, MICROS_PER_SECOND)
@@ -225,17 +236,15 @@ trait SparkDateTimeUtils {
   }
 
   /**
-   * Converts the local time to the number of microseconds within the day, 
from 0 to (24 * 60 * 60
-   * * 1000000) - 1.
+   * Converts the local time to the number of nanoseconds within the day, from 
0 to (24 * 60 * 60
+   * * 1000 * 1000 * 1000) - 1.
    */
-  def localTimeToMicros(localTime: LocalTime): Long = 
localTime.getLong(MICRO_OF_DAY)
+  def localTimeToNanos(localTime: LocalTime): Long = 
localTime.getLong(NANO_OF_DAY)
 
   /**
-   * Converts the number of microseconds within the day to the local time.
+   * Converts the number of nanoseconds within the day to the local time.
    */
-  def microsToLocalTime(micros: Long): LocalTime = {
-    LocalTime.ofNanoOfDay(Math.multiplyExact(micros, NANOS_PER_MICROS))
-  }
+  def nanosToLocalTime(nanos: Long): LocalTime = LocalTime.ofNanoOfDay(nanos)
 
   /**
    * Converts a local date at the default JVM time zone to the number of days 
since 1970-01-01 in
@@ -716,7 +725,7 @@ trait SparkDateTimeUtils {
       }
       val nanoseconds = MICROSECONDS.toNanos(segments(6))
       val localTime = LocalTime.of(segments(3), segments(4), segments(5), 
nanoseconds.toInt)
-      Some(localTimeToMicros(localTime))
+      Some(localTimeToNanos(localTime))
     } catch {
       case NonFatal(_) => None
     }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala
index 46afbc8aca19..a159f74572f1 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala
@@ -25,11 +25,11 @@ import 
org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._
 import org.apache.spark.unsafe.types.UTF8String
 
 sealed trait TimeFormatter extends Serializable {
-  def parse(s: String): Long // returns microseconds since midnight
+  def parse(s: String): Long // returns nanoseconds since midnight
 
   def format(localTime: LocalTime): String
-  // Converts microseconds since the midnight to time string
-  def format(micros: Long): String
+  // Converts nanoseconds since the midnight to time string
+  def format(nanos: Long): String
 
   def validatePatternString(): Unit
 }
@@ -47,15 +47,15 @@ class Iso8601TimeFormatter(pattern: String, locale: Locale, 
isParsing: Boolean)
 
   override def parse(s: String): Long = {
     val localTime = toLocalTime(formatter.parse(s))
-    localTimeToMicros(localTime)
+    localTimeToNanos(localTime)
   }
 
   override def format(localTime: LocalTime): String = {
     localTime.format(formatter)
   }
 
-  override def format(micros: Long): String = {
-    format(microsToLocalTime(micros))
+  override def format(nanos: Long): String = {
+    format(nanosToLocalTime(nanos))
   }
 
   override def validatePatternString(): Unit = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index bb6afb3b13fa..440cfb713242 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -375,14 +375,14 @@ object CatalystTypeConverters {
 
   private object TimeConverter extends CatalystTypeConverter[LocalTime, 
LocalTime, Any] {
     override def toCatalystImpl(scalaValue: LocalTime): Long = {
-      DateTimeUtils.localTimeToMicros(scalaValue)
+      DateTimeUtils.localTimeToNanos(scalaValue)
     }
     override def toScala(catalystValue: Any): LocalTime = {
       if (catalystValue == null) null
-      else DateTimeUtils.microsToLocalTime(catalystValue.asInstanceOf[Long])
+      else DateTimeUtils.nanosToLocalTime(catalystValue.asInstanceOf[Long])
     }
     override def toScalaImpl(row: InternalRow, column: Int): LocalTime =
-      DateTimeUtils.microsToLocalTime(row.getLong(column))
+      DateTimeUtils.nanosToLocalTime(row.getLong(column))
   }
 
   private object TimestampConverter extends CatalystTypeConverter[Any, 
Timestamp, Any] {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 9b22f28ed12d..15de70e35a45 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -160,7 +160,7 @@ object DeserializerBuildHelper {
     StaticInvoke(
       DateTimeUtils.getClass,
       ObjectType(classOf[java.time.LocalTime]),
-      "microsToLocalTime",
+      "nanosToLocalTime",
       path :: Nil,
       returnNullable = false)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
index c8bf1f523799..82b3cdc508bf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
@@ -103,7 +103,7 @@ object SerializerBuildHelper {
     StaticInvoke(
       DateTimeUtils.getClass,
       TimeType(),
-      "localTimeToMicros",
+      "localTimeToNanos",
       inputObject :: Nil,
       returnNullable = false)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index e3ed2c4a0b0b..925735654b73 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -49,7 +49,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern
 import org.apache.spark.sql.catalyst.trees.TreePattern.{LITERAL, NULL_LITERAL, 
TRUE_OR_FALSE_LITERAL}
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, 
localTimeToMicros}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, 
localTimeToNanos}
 import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
 import org.apache.spark.sql.catalyst.util.IntervalUtils.{durationToMicros, 
periodToMonths, toDayTimeIntervalString, toYearMonthIntervalString}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
@@ -89,7 +89,7 @@ object Literal {
     case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l), 
TimestampNTZType)
     case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType)
     case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
-    case lt: LocalTime => Literal(localTimeToMicros(lt), TimeType())
+    case lt: LocalTime => Literal(localTimeToNanos(lt), TimeType())
     case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType())
     case p: Period => Literal(periodToMonths(p), YearMonthIntervalType())
     case a: Array[Byte] => Literal(a, BinaryType)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index d7cbb9886ba1..cd811bc9749f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -109,7 +109,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
    * Returns the hour value of a given TIME (TimeType) value.
    */
   def getHoursOfTime(micros: Long): Int = {
-    microsToLocalTime(micros).getHour
+    nanosToLocalTime(micros).getHour
   }
 
   /**
@@ -124,7 +124,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
    * Returns the minute value of a given TIME (TimeType) value.
    */
   def getMinutesOfTime(micros: Long): Int = {
-    microsToLocalTime(micros).getMinute
+    nanosToLocalTime(micros).getMinute
   }
 
   /**
@@ -139,7 +139,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
    * Returns the second value of a given TIME (TimeType) value.
    */
   def getSecondsOfTime(micros: Long): Int = {
-    microsToLocalTime(micros).getSecond
+    nanosToLocalTime(micros).getSecond
   }
   /**
    * Returns the seconds part and its fractional part with microseconds.
@@ -151,16 +151,16 @@ object DateTimeUtils extends SparkDateTimeUtils {
 
   /**
    * Returns the second value with fraction from a given TIME (TimeType) value.
-   * @param micros
-   *   The number of microseconds since the epoch.
+   * @param nanos
+   *   The number of nanoseconds since the epoch.
    * @param precision
    *   The time fractional seconds precision, which indicates the number of 
decimal digits
    *   maintained.
    */
-  def getSecondsOfTimeWithFraction(micros: Long, precision: Int): Decimal = {
-    val seconds = (micros / MICROS_PER_SECOND) % SECONDS_PER_MINUTE
+  def getSecondsOfTimeWithFraction(nanos: Long, precision: Int): Decimal = {
+    val seconds = (nanos / NANOS_PER_SECOND) % SECONDS_PER_MINUTE
     val scaleFactor = math.pow(10, precision).toLong
-    val scaledFraction = (micros % MICROS_PER_SECOND) * scaleFactor / 
MICROS_PER_SECOND
+    val scaledFraction = (nanos % NANOS_PER_SECOND) * scaleFactor / 
NANOS_PER_SECOND
     val fraction = scaledFraction.toDouble / scaleFactor
     Decimal(seconds + fraction, 8, 6)
   }
@@ -816,7 +816,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
 
       val nanos = Math.floorMod(unscaledSecFrac, MICROS_PER_SECOND) * 
NANOS_PER_MICROS
       val lt = LocalTime.of(hours, minutes, fullSecs.toInt, nanos.toInt)
-      localTimeToMicros(lt)
+      localTimeToNanos(lt)
     } catch {
       case e @ (_: DateTimeException | _: ArithmeticException) =>
         throw 
QueryExecutionErrors.ansiDateTimeArgumentOutOfRangeWithoutSuggestion(e)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
index 2f92fe3d083d..de916d13b1ba 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
@@ -292,7 +292,7 @@ object RandomDataGenerator {
         randomNumeric[LocalTime](
           rand,
           (rand: Random) => {
-            DateTimeUtils.microsToLocalTime(rand.between(0, 24 * 60 * 60 * 
1000 * 1000L))
+            DateTimeUtils.nanosToLocalTime(rand.between(0, 24 * 60 * 60 * 1000 
* 1000L))
           },
           specialTimes.map(LocalTime.parse)
         )
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index e4c48f7467f9..00d8bd6633a9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -435,7 +435,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
       "23:59:59.999999").foreach { time =>
       val input = LocalTime.parse(time)
       val result = CatalystTypeConverters.convertToCatalyst(input)
-      val expected = DateTimeUtils.localTimeToMicros(input)
+      val expected = DateTimeUtils.localTimeToNanos(input)
       assert(result === expected)
     }
   }
@@ -449,7 +449,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
       43200999999L,
       86399000000L,
       86399999999L).foreach { us =>
-      val localTime = DateTimeUtils.microsToLocalTime(us)
+      val localTime = DateTimeUtils.nanosToLocalTime(us)
       assert(CatalystTypeConverters.createToScalaConverter(TimeType())(us) === 
localTime)
     }
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index 1609e1a4e113..05ccaf2cbda0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -377,7 +377,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
     val encoder = ExpressionEncoder(schema).resolveAndBind()
     val localTime = java.time.LocalTime.parse("20:38:45.123456")
     val row = toRow(encoder, Row(localTime))
-    assert(row.getLong(0) === DateTimeUtils.localTimeToMicros(localTime))
+    assert(row.getLong(0) === DateTimeUtils.localTimeToNanos(localTime))
     val readback = fromRow(encoder, row)
     assert(readback.get(0).equals(localTime))
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
index dddc33aa4358..c64b94703288 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
@@ -756,9 +756,9 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 
   test("Support TimeType") {
     val time = Literal.create(LocalTime.of(23, 50, 59, 123456000), TimeType())
-    checkEvaluation(Murmur3Hash(Seq(time), 10), 258472763)
-    checkEvaluation(XxHash64(Seq(time), 10), -9197489935839400467L)
-    checkEvaluation(HiveHash(Seq(time)), -40222445)
+    checkEvaluation(Murmur3Hash(Seq(time), 10), 545499634)
+    checkEvaluation(XxHash64(Seq(time), 10), -3550518982366774761L)
+    checkEvaluation(HiveHash(Seq(time)), -1567775210)
   }
 
   private def testHash(inputSchema: StructType): Unit = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala
index ed5843478c00..bada54135653 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala
@@ -125,8 +125,8 @@ object LiteralGenerator {
 
   lazy val timeLiteralGen: Gen[Literal] = {
     // Valid range for TimeType is [00:00:00, 23:59:59.999999]
-    val minTime = DateTimeUtils.localTimeToMicros(LocalTime.MIN)
-    val maxTime = DateTimeUtils.localTimeToMicros(LocalTime.MAX)
+    val minTime = DateTimeUtils.localTimeToNanos(LocalTime.MIN)
+    val maxTime = DateTimeUtils.localTimeToNanos(LocalTime.MAX)
     for { t <- Gen.choose(minTime, maxTime) }
       yield Literal(t, TimeType())
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala
index 5c297c00acc0..6a2651edd9ab 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala
@@ -136,9 +136,9 @@ class ToPrettyStringSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("Time as pretty strings") {
-    checkEvaluation(ToPrettyString(Literal(1000L, TimeType())), "00:00:00.001")
-    checkEvaluation(ToPrettyString(Literal(1L, TimeType())), "00:00:00.000001")
+    checkEvaluation(ToPrettyString(Literal(1000 * 1000L, TimeType())), 
"00:00:00.001")
+    checkEvaluation(ToPrettyString(Literal(1000L, TimeType())), 
"00:00:00.000001")
     checkEvaluation(ToPrettyString(Literal(
-      (23 * 3600 + 59 * 60 + 59) * 1000000L, TimeType())), "23:59:59")
+      (23 * 3600 + 59 * 60 + 59) * 1000000000L, TimeType())), "23:59:59")
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
index f281c42bbe71..b17a22778801 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, 
localTimeToMicros}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, 
localTimeToNanos}
 
 /**
  * Helper functions for testing date and time functionality.
@@ -113,7 +113,7 @@ object DateTimeTestUtils {
     result
   }
 
-  // Returns microseconds since midnight
+  // Returns nanoseconds since midnight
   def localTime(
       hour: Byte = 0,
       minute: Byte = 0,
@@ -121,6 +121,6 @@ object DateTimeTestUtils {
       micros: Int = 0): Long = {
     val nanos = TimeUnit.MICROSECONDS.toNanos(micros).toInt
     val localTime = LocalTime.of(hour, minute, sec, nanos)
-    localTimeToMicros(localTime)
+    localTimeToNanos(localTime)
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 24258a2268ba..0307b6d944fb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -1107,25 +1107,27 @@ class DateTimeUtilsSuite extends SparkFunSuite with 
Matchers with SQLHelper {
           "invalidValue" -> "'SECS'"))
   }
 
-  test("localTimeToMicros and microsToLocalTime") {
-    assert(microsToLocalTime(0) === LocalTime.of(0, 0))
-    assert(localTimeToMicros(LocalTime.of(0, 0)) === 0)
+  test("localTimeToNanos and nanosToLocalTime") {
+    assert(nanosToLocalTime(0) === LocalTime.of(0, 0))
+    assert(localTimeToNanos(LocalTime.of(0, 0)) === 0)
 
-    assert(localTimeToMicros(microsToLocalTime(123456789)) === 123456789)
+    assert(localTimeToNanos(nanosToLocalTime(123456789123L)) === 123456789123L)
 
-    assert(localTimeToMicros(LocalTime.parse("23:59:59.999999")) === (24L * 60 
* 60 * 1000000 - 1))
-    assert(microsToLocalTime(24L * 60 * 60 * 1000000 - 1) === LocalTime.of(23, 
59, 59, 999999000))
+    assert(localTimeToNanos(LocalTime.parse("23:59:59.999999999")) ===
+      (24L * 60 * 60 * 1000 * 1000 * 1000 - 1))
+    assert(nanosToLocalTime(24L * 60 * 60 * 1000 * 1000 * 1000 - 1) ===
+      LocalTime.of(23, 59, 59, 999999999))
 
-    Seq(-1, 24L * 60 * 60 * 1000000).foreach { invalidMicros =>
+    Seq(-1, 24L * 60 * 60 * 1000 * 1000 * 1000L).foreach { invalidMicros =>
       val msg = intercept[DateTimeException] {
-        microsToLocalTime(invalidMicros)
+        nanosToLocalTime(invalidMicros)
       }.getMessage
       assert(msg.contains("Invalid value"))
     }
-    val msg = intercept[ArithmeticException] {
-      microsToLocalTime(Long.MaxValue)
+    val msg = intercept[DateTimeException] {
+      nanosToLocalTime(Long.MaxValue)
     }.getMessage
-    assert(msg == "long overflow")
+    assert(msg.contains("Invalid value"))
   }
 
   test("stringToTime") {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala
index d99ea2bd1042..9a707d80d248 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala
@@ -24,22 +24,24 @@ import scala.util.Random
 import org.apache.spark.{SPARK_DOC_ROOT, SparkDateTimeException, 
SparkFunSuite, SparkRuntimeException}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToLocalTime
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.nanosToLocalTime
 
 class TimeFormatterSuite extends SparkFunSuite with SQLHelper {
 
   test("time parsing") {
     Seq(
-      ("12", "HH") -> 12 * 3600 * 1000000L,
-      ("01:02", "HH:mm") -> (1 * 3600 + 2 * 60) * 1000000L,
-      ("10:20", "HH:mm") -> (10 * 3600 + 20 * 60) * 1000000L,
+      ("12", "HH") -> 12 * 3600 * 1000000000L,
+      ("01:02", "HH:mm") -> (1 * 3600 + 2 * 60) * 1000000000L,
+      ("10:20", "HH:mm") -> (10 * 3600 + 20 * 60) * 1000000000L,
       ("00:00:00", "HH:mm:ss") -> 0L,
-      ("01:02:03", "HH:mm:ss") -> (1 * 3600 + 2 * 60 + 3) * 1000000L,
-      ("23:59:59", "HH:mm:ss") -> (23 * 3600 + 59 * 60 + 59) * 1000000L,
+      ("01:02:03", "HH:mm:ss") -> (1 * 3600 + 2 * 60 + 3) * 1000000000L,
+      ("23:59:59", "HH:mm:ss") -> (23 * 3600 + 59 * 60 + 59) * 1000000000L,
       ("00:00:00.000000", "HH:mm:ss.SSSSSS") -> 0L,
-      ("12:34:56.789012", "HH:mm:ss.SSSSSS") -> ((12 * 3600 + 34 * 60 + 56) * 
1000000L + 789012),
-      ("23:59:59.000000", "HH:mm:ss.SSSSSS") -> (23 * 3600 + 59 * 60 + 59) * 
1000000L,
-      ("23:59:59.999999", "HH:mm:ss.SSSSSS") -> ((23 * 3600 + 59 * 60 + 59) * 
1000000L + 999999)
+      ("12:34:56.789012", "HH:mm:ss.SSSSSS") ->
+        ((12 * 3600 + 34 * 60 + 56) * 1000000000L + 789012000),
+      ("23:59:59.000000", "HH:mm:ss.SSSSSS") -> (23 * 3600 + 59 * 60 + 59) * 
1000000000L,
+      ("23:59:59.999999", "HH:mm:ss.SSSSSS") ->
+        ((23 * 3600 + 59 * 60 + 59) * 1000000000L + 999999000)
     ).foreach { case ((inputStr, pattern), expectedMicros) =>
       val formatter = TimeFormatter(format = pattern, isParsing = true)
       assert(formatter.parse(inputStr) === expectedMicros)
@@ -60,16 +62,18 @@ class TimeFormatterSuite extends SparkFunSuite with 
SQLHelper {
 
   test("time formatting") {
     Seq(
-      (12 * 3600 * 1000000L, "HH") -> "12",
-      ((1 * 3600 + 2 * 60) * 1000000L, "HH:mm") -> "01:02",
-      ((10 * 3600 + 20 * 60) * 1000000L, "HH:mm") -> "10:20",
+      (12 * 3600 * 1000000000L, "HH") -> "12",
+      ((1 * 3600 + 2 * 60) * 1000000000L, "HH:mm") -> "01:02",
+      ((10 * 3600 + 20 * 60) * 1000000000L, "HH:mm") -> "10:20",
       (0L, "HH:mm:ss") -> "00:00:00",
-      ((1 * 3600 + 2 * 60 + 3) * 1000000L, "HH:mm:ss") -> "01:02:03",
-      ((23 * 3600 + 59 * 60 + 59) * 1000000L, "HH:mm:ss") -> "23:59:59",
+      ((1 * 3600 + 2 * 60 + 3) * 1000000000L, "HH:mm:ss") -> "01:02:03",
+      ((23 * 3600 + 59 * 60 + 59) * 1000000000L, "HH:mm:ss") -> "23:59:59",
       (0L, "HH:mm:ss.SSSSSS") -> "00:00:00.000000",
-      ((12 * 3600 + 34 * 60 + 56) * 1000000L + 789012, "HH:mm:ss.SSSSSS") -> 
"12:34:56.789012",
-      ((23 * 3600 + 59 * 60 + 59) * 1000000L, "HH:mm:ss.SSSSSS") -> 
"23:59:59.000000",
-      ((23 * 3600 + 59 * 60 + 59) * 1000000L + 999999, "HH:mm:ss.SSSSSS") -> 
"23:59:59.999999"
+      ((12 * 3600 + 34 * 60 + 56) * 1000000000L + 789012000, 
"HH:mm:ss.SSSSSS") ->
+        "12:34:56.789012",
+      ((23 * 3600 + 59 * 60 + 59) * 1000000000L, "HH:mm:ss.SSSSSS") -> 
"23:59:59.000000",
+      ((23 * 3600 + 59 * 60 + 59) * 1000000000L + 999999000, 
"HH:mm:ss.SSSSSS") ->
+        "23:59:59.999999"
     ).foreach { case ((micros, pattern), expectedStr) =>
       val formatter = TimeFormatter(format = pattern)
       assert(formatter.format(micros) === expectedStr)
@@ -82,8 +86,8 @@ class TimeFormatterSuite extends SparkFunSuite with SQLHelper 
{
       assert(e.getMessage.contains(expectedMsg))
     }
 
-    assertError(-1, "Invalid value for NanoOfDay (valid values 0 - 
86399999999999): -1000")
-    assertError(25L * 3600 * 1000 * 1000,
+    assertError(-1000, "Invalid value for NanoOfDay (valid values 0 - 
86399999999999): -1000")
+    assertError(25L * 3600 * 1000 * 1000 * 1000,
       "Invalid value for NanoOfDay (valid values 0 - 86399999999999): 
90000000000000")
   }
 
@@ -101,14 +105,14 @@ class TimeFormatterSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("round trip with the default pattern: format -> parse") {
-    val data = Seq.tabulate(10) { _ => Random.between(0, 24 * 60 * 60 * 
1000000L) }
+    val data = Seq.tabulate(10) { _ => Random.between(0, 24 * 60 * 60 * 
1000000L) * 1000L }
     val pattern = "HH:mm:ss.SSSSSS"
     val (formatter, parser) =
       (TimeFormatter(pattern, isParsing = false), TimeFormatter(pattern, 
isParsing = true))
-    data.foreach { micros =>
-      val str = formatter.format(micros)
-      assert(parser.parse(str) === micros, s"micros = $micros")
-      assert(formatter.format(microsToLocalTime(micros)) === str)
+    data.foreach { nanos =>
+      val str = formatter.format(nanos)
+      assert(parser.parse(str) === nanos, s"nanos = $nanos")
+      assert(formatter.format(nanosToLocalTime(nanos)) === str)
     }
   }
 
@@ -120,8 +124,9 @@ class TimeFormatterSuite extends SparkFunSuite with 
SQLHelper {
       1000 -> "00:00:00.001",
       900000 -> "00:00:00.9",
       1000000 -> "00:00:01").foreach { case (micros, tsStr) =>
-      assert(formatter.format(micros) === tsStr)
-      assert(formatter.format(microsToLocalTime(micros)) === tsStr)
+      val nanos = micros * 1000L
+      assert(formatter.format(nanos) === tsStr)
+      assert(formatter.format(nanosToLocalTime(nanos)) === tsStr)
     }
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 889f11e11973..eb6c84b8113b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -24,6 +24,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 
@@ -159,7 +160,7 @@ public class ParquetVectorUpdaterFactory {
         } else if (canReadAsDecimal(descriptor, sparkType)) {
           return new LongToDecimalUpdater(descriptor, (DecimalType) sparkType);
         } else if (sparkType instanceof TimeType) {
-          return new LongUpdater();
+          return new LongAsNanosUpdater();
         }
       }
       case FLOAT -> {
@@ -233,6 +234,11 @@ public class ParquetVectorUpdaterFactory {
       annotation.getUnit() == unit;
   }
 
+  boolean isTimeTypeMatched(LogicalTypeAnnotation.TimeUnit unit) {
+    return logicalTypeAnnotation instanceof TimeLogicalTypeAnnotation 
annotation &&
+      annotation.getUnit() == unit;
+  }
+
   boolean isUnsignedIntTypeMatched(int bitWidth) {
     return logicalTypeAnnotation instanceof IntLogicalTypeAnnotation 
annotation &&
       !annotation.isSigned() && annotation.getBitWidth() == bitWidth;
@@ -825,6 +831,42 @@ public class ParquetVectorUpdaterFactory {
     }
   }
 
+  private static class LongAsNanosUpdater implements ParquetVectorUpdater {
+    @Override
+    public void readValues(
+        int total,
+        int offset,
+        WritableColumnVector values,
+        VectorizedValuesReader valuesReader) {
+      for (int i = 0; i < total; ++i) {
+        readValue(offset + i, values, valuesReader);
+      }
+    }
+
+    @Override
+    public void skipValues(int total, VectorizedValuesReader valuesReader) {
+      valuesReader.skipLongs(total);
+    }
+
+    @Override
+    public void readValue(
+        int offset,
+        WritableColumnVector values,
+        VectorizedValuesReader valuesReader) {
+      values.putLong(offset, 
DateTimeUtils.microsToNanos(valuesReader.readLong()));
+    }
+
+    @Override
+    public void decodeSingleDictionaryId(
+        int offset,
+        WritableColumnVector values,
+        WritableColumnVector dictionaryIds,
+        Dictionary dictionary) {
+      long micros = dictionary.decodeToLong(dictionaryIds.getDictId(offset));
+      values.putLong(offset, DateTimeUtils.microsToNanos(micros));
+    }
+  }
+
   private static class FloatUpdater implements ParquetVectorUpdater {
     @Override
     public void readValues(
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 731c78cf9450..6e1660dc8c87 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -165,7 +165,8 @@ public class VectorizedColumnReader {
       case INT64: {
         boolean isDecimal = sparkType instanceof DecimalType;
         boolean needsUpcast = (isDecimal && 
!DecimalType.is64BitDecimalType(sparkType)) ||
-          updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS);
+          updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS) ||
+          updaterFactory.isTimeTypeMatched(TimeUnit.MICROS);
         boolean needsRebase = 
updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS) &&
           !"CORRECTED".equals(datetimeRebaseMode);
         isSupported = !needsUpcast && !needsRebase && 
!needsDecimalScaleRebase(sparkType);
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 565742671b9c..4a9b17bf98e5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.sql.{Date, Timestamp}
 import java.time.{Duration, Instant, LocalDate, LocalTime, Period}
+import java.time.temporal.ChronoField.MICRO_OF_DAY
 import java.util.HashSet
 import java.util.Locale
 
@@ -149,7 +150,7 @@ class ParquetFilters(
     ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MICROS), INT64, 0)
   private val ParquetTimestampMillisType =
     ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MILLIS), INT64, 0)
-  private val ParquetTimeType =
+  private val ParquetTimeMicrosType =
     ParquetSchemaType(LogicalTypeAnnotation.timeType(false, TimeUnit.MICROS), 
INT64, 0)
 
   private def dateToDays(date: Any): Int = {
@@ -176,7 +177,7 @@ class ParquetFilters(
   }
 
   private def localTimeToMicros(v: Any): JLong = {
-    DateTimeUtils.localTimeToMicros(v.asInstanceOf[LocalTime])
+    v.asInstanceOf[LocalTime].getLong(MICRO_OF_DAY)
   }
 
   private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
@@ -213,7 +214,7 @@ class ParquetFilters(
 
   private def toLongValue(v: Any): JLong = v match {
     case d: Duration => IntervalUtils.durationToMicros(d)
-    case lt: LocalTime => DateTimeUtils.localTimeToMicros(lt)
+    case lt: LocalTime => localTimeToMicros(lt)
     case l => l.asInstanceOf[JLong]
   }
 
@@ -251,7 +252,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
         Option(v).map(timestampToMillis).orNull)
-    case ParquetTimeType =>
+    case ParquetTimeMicrosType =>
       (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
         Option(v).map(localTimeToMicros).orNull)
@@ -304,7 +305,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
         Option(v).map(timestampToMillis).orNull)
-    case ParquetTimeType =>
+    case ParquetTimeMicrosType =>
       (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
         Option(v).map(localTimeToMicros).orNull)
@@ -348,7 +349,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), 
timestampToMicros(v))
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), 
timestampToMillis(v))
-    case ParquetTimeType =>
+    case ParquetTimeMicrosType =>
       (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), 
localTimeToMicros(v))
 
     case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
@@ -387,7 +388,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), 
timestampToMicros(v))
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), 
timestampToMillis(v))
-    case ParquetTimeType =>
+    case ParquetTimeMicrosType =>
       (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), 
localTimeToMicros(v))
 
     case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
@@ -426,7 +427,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), 
timestampToMicros(v))
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), 
timestampToMillis(v))
-    case ParquetTimeType =>
+    case ParquetTimeMicrosType =>
       (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), 
localTimeToMicros(v))
 
     case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
@@ -465,7 +466,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), 
timestampToMicros(v))
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), 
timestampToMillis(v))
-    case ParquetTimeType =>
+    case ParquetTimeMicrosType =>
       (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), 
localTimeToMicros(v))
 
     case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
@@ -556,7 +557,7 @@ class ParquetFilters(
         }
         FilterApi.in(longColumn(n), set)
 
-    case ParquetTimeType =>
+    case ParquetTimeMicrosType =>
       (n: Array[String], values: Array[Any]) =>
         val set = new HashSet[JLong]()
         for (value <- values) {
@@ -661,7 +662,7 @@ class ParquetFilters(
         value.isInstanceOf[Date] || value.isInstanceOf[LocalDate]
       case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
         value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant]
-      case ParquetTimeType => value.isInstanceOf[LocalTime]
+      case ParquetTimeMicrosType => value.isInstanceOf[LocalTime]
       case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT32, 
_) =>
         isDecimalMatched(value, decimalType)
       case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT64, 
_) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 0927f5c3c963..cb5e7bf53215 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -487,7 +487,8 @@ private[parquet] class ParquetRowConverter(
             .asInstanceOf[TimeLogicalTypeAnnotation].getUnit == 
TimeUnit.MICROS =>
         new ParquetPrimitiveConverter(updater) {
           override def addLong(value: Long): Unit = {
-            this.updater.setLong(value)
+            val nanos = DateTimeUtils.microsToNanos(value)
+            this.updater.setLong(nanos)
           }
         }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 4022f7ea3003..9f1a815d7fa1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -209,7 +209,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addInteger(row.getInt(ordinal))
 
-      case LongType | _: DayTimeIntervalType | _: TimeType =>
+      case LongType | _: DayTimeIntervalType =>
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addLong(row.getLong(ordinal))
 
@@ -253,6 +253,10 @@ class ParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         // MICROS time unit.
         (row: SpecializedGetters, ordinal: Int) => 
recordConsumer.addLong(row.getLong(ordinal))
 
+      case _: TimeType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          
recordConsumer.addLong(DateTimeUtils.nanosToMicros(row.getLong(ordinal)))
+
       case BinaryType =>
         (row: SpecializedGetters, ordinal: Int) =>
           
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d78b4a426e70..f52b0bdd8790 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -41,8 +41,8 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, 
TestUtils}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.localTime
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import 
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -1623,7 +1623,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
         val writer = createParquetWriter(schema, tablePath, dictionaryEnabled 
= dictEnabled)
         (0 until numRecords).foreach { i =>
           val record = new SimpleGroup(schema)
-          record.add(0, localTime(23, 59, 59, 123456))
+          record.add(0, localTime(23, 59, 59, 123456) / 
DateTimeConstants.NANOS_PER_MICROS)
           writer.write(record)
         }
         writer.close


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to