This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b97425a9bb [format] Support reading Parquet TIMESTAMP(NANOS) as 
timestamp(9) (#7845)
b97425a9bb is described below

commit b97425a9bb1cc2fd6dbb8e18462688d973dc2aa7
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 14 00:08:02 2026 +0800

    [format] Support reading Parquet TIMESTAMP(NANOS) as timestamp(9) (#7845)
    
    Native Parquet writers such as Arrow can encode `TIMESTAMP(9)` as
    `INT64` with the `TIMESTAMP(NANOS)` logical annotation. This is valid
    Parquet, but Paimon's vectorized Parquet reader did not use the logical
    timestamp unit when decoding `INT64` timestamp columns, so nanosecond
    timestamps could not be read as Paimon `timestamp(9)` correctly.
---
 .../format/parquet/ParquetSchemaConverter.java     | 16 ++--
 .../reader/ParquetVectorUpdaterFactory.java        | 93 ++++++++++++++++++++--
 .../format/parquet/ParquetReadWriteTest.java       | 76 ++++++++++++++++++
 .../format/parquet/ParquetSchemaConverterTest.java | 35 ++++++++
 4 files changed, 209 insertions(+), 11 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index 37a69fe9ae..640081cd50 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -353,12 +353,16 @@ public class ParquetSchemaConverter {
                             instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
                         LogicalTypeAnnotation.TimestampLogicalTypeAnnotation 
timestampType =
                                 
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
-                        int precision =
-                                timestampType
-                                                .getUnit()
-                                                
.equals(LogicalTypeAnnotation.TimeUnit.MILLIS)
-                                        ? 3
-                                        : 6;
+                        int precision;
+                        if 
(timestampType.getUnit().equals(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+                            precision = 3;
+                        } else if (timestampType
+                                .getUnit()
+                                
.equals(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+                            precision = 6;
+                        } else {
+                            precision = 9;
+                        }
                         paimonDataType =
                                 timestampType.isAdjustedToUTC()
                                         ? new 
LocalZonedTimestampType(precision)
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
index 0abf78fd27..fdf3ecdc30 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
@@ -190,7 +190,9 @@ public class ParquetVectorUpdaterFactory {
             return c -> {
                 if (c.getPrimitiveType().getPrimitiveTypeName()
                         == PrimitiveType.PrimitiveTypeName.INT64) {
-                    return new 
LongTimestampUpdater(timestampType.getPrecision());
+                    return new LongTimestampUpdater(
+                            timestampType.getPrecision(),
+                            timestampUnit(c, timestampType.getPrecision()));
                 } else if (c.getPrimitiveType().getPrimitiveTypeName()
                         == PrimitiveType.PrimitiveTypeName.INT96) {
                     return new TimestampUpdater(timestampType.getPrecision());
@@ -206,12 +208,32 @@ public class ParquetVectorUpdaterFactory {
             return c -> {
                 if (c.getPrimitiveType().getPrimitiveTypeName()
                         == PrimitiveType.PrimitiveTypeName.INT64) {
-                    return new LongUpdater();
+                    return new LongTimestampUpdater(
+                            localZonedTimestampType.getPrecision(),
+                            timestampUnit(c, 
localZonedTimestampType.getPrecision()));
                 }
                 return new 
TimestampUpdater(localZonedTimestampType.getPrecision());
             };
         }
 
+        private static LogicalTypeAnnotation.TimeUnit timestampUnit(
+                ColumnDescriptor descriptor, int precision) {
+            LogicalTypeAnnotation typeAnnotation =
+                    descriptor.getPrimitiveType().getLogicalTypeAnnotation();
+            if (typeAnnotation instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+                return ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) 
typeAnnotation)
+                        .getUnit();
+            }
+
+            if (precision <= 3) {
+                return LogicalTypeAnnotation.TimeUnit.MILLIS;
+            } else if (precision <= 6) {
+                return LogicalTypeAnnotation.TimeUnit.MICROS;
+            } else {
+                return LogicalTypeAnnotation.TimeUnit.MILLIS;
+            }
+        }
+
         @Override
         public UpdaterFactory visit(VariantType variantType) {
             throw new RuntimeException("Variant type is not supported");
@@ -436,8 +458,11 @@ public class ParquetVectorUpdaterFactory {
 
     private static class LongTimestampUpdater extends AbstractTimestampUpdater 
{
 
-        public LongTimestampUpdater(int precision) {
+        private final LogicalTypeAnnotation.TimeUnit timeUnit;
+
+        public LongTimestampUpdater(int precision, 
LogicalTypeAnnotation.TimeUnit timeUnit) {
             super(precision);
+            this.timeUnit = timeUnit;
         }
 
         @Override
@@ -465,9 +490,67 @@ public class ParquetVectorUpdaterFactory {
         private void putTimestamp(WritableColumnVector vector, int offset, 
long timestamp) {
             if (vector instanceof WritableTimestampVector) {
                 ((WritableTimestampVector) vector)
-                        .setTimestamp(offset, 
Timestamp.fromEpochMillis(timestamp));
+                        .setTimestamp(offset, timestampFromInt64(timestamp, 
timeUnit));
+            } else {
+                ((WritableLongVector) vector).setLong(offset, 
longTimestamp(timestamp));
+            }
+        }
+
+        private long longTimestamp(long timestamp) {
+            if (precision <= 3) {
+                return millisFromInt64(timestamp, timeUnit);
+            } else if (precision <= 6) {
+                return microsFromInt64(timestamp, timeUnit);
             } else {
-                ((WritableLongVector) vector).setLong(offset, timestamp);
+                throw new UnsupportedOperationException(
+                        "Unsupported timestamp precision: " + precision);
+            }
+        }
+
+        private static Timestamp timestampFromInt64(
+                long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
+            switch (timeUnit) {
+                case MILLIS:
+                    return Timestamp.fromEpochMillis(timestamp);
+                case MICROS:
+                    return Timestamp.fromMicros(timestamp);
+                case NANOS:
+                    return Timestamp.fromEpochMillis(
+                            Math.floorDiv(timestamp, 1_000_000L),
+                            (int) Math.floorMod(timestamp, 1_000_000L));
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported timestamp unit: " + timeUnit);
+            }
+        }
+
+        private static long millisFromInt64(
+                long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
+            switch (timeUnit) {
+                case MILLIS:
+                    return timestamp;
+                case MICROS:
+                    return Math.floorDiv(timestamp, 1_000L);
+                case NANOS:
+                    return Math.floorDiv(timestamp, 1_000_000L);
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported timestamp unit: " + timeUnit);
+            }
+        }
+
+        private static long microsFromInt64(
+                long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
+            switch (timeUnit) {
+                case MILLIS:
+                    return Math.multiplyExact(timestamp, 1_000L);
+                case MICROS:
+                    return timestamp;
+                case NANOS:
+                    return Math.floorDiv(timestamp, 1_000L);
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported timestamp unit: " + timeUnit);
             }
         }
     }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 58fc10cc51..5851ef7db5 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -670,6 +670,77 @@ public class ParquetReadWriteTest {
                 });
     }
 
+    @Test
+    public void testReadTimestampNanosWrittenByParquet() throws Exception {
+        Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
+        Configuration conf = new Configuration();
+        Type timestampNanosType =
+                Types.primitive(INT64, Type.Repetition.REQUIRED)
+                        .as(
+                                LogicalTypeAnnotation.timestampType(
+                                        false, 
LogicalTypeAnnotation.TimeUnit.NANOS))
+                        .named("f0")
+                        .withId(0);
+        Type arrayTimestampNanosType =
+                ConversionPatterns.listOfElements(
+                                Type.Repetition.OPTIONAL,
+                                "f1",
+                                Types.primitive(INT64, 
Type.Repetition.OPTIONAL)
+                                        .as(
+                                                
LogicalTypeAnnotation.timestampType(
+                                                        false,
+                                                        
LogicalTypeAnnotation.TimeUnit.NANOS))
+                                        .named("element")
+                                        .withId(2))
+                        .withId(1);
+        MessageType schema =
+                new MessageType("origin-parquet", timestampNanosType, 
arrayTimestampNanosType);
+        long[] nanosValues = new long[] {1704067200123456789L, -123456789L};
+
+        try (ParquetWriter<Group> writer =
+                ExampleParquetWriter.builder(
+                                HadoopOutputFile.fromPath(
+                                        new 
org.apache.hadoop.fs.Path(path.toString()), conf))
+                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                        .withConf(new Configuration())
+                        .withType(schema)
+                        .build()) {
+            SimpleGroupFactory simpleGroupFactory = new 
SimpleGroupFactory(schema);
+            for (long nanos : nanosValues) {
+                Group row = simpleGroupFactory.newGroup();
+                row.append("f0", nanos);
+                Group array = row.addGroup("f1");
+                array.addGroup(0).add(0, nanos);
+                array.addGroup(0).add(0, nanos + 1);
+                writer.write(row);
+            }
+        }
+
+        RowType paimonRowType =
+                RowType.builder()
+                        .fields(new TimestampType(9), new ArrayType(new 
TimestampType(9)))
+                        .build();
+        ParquetReaderFactory format =
+                new ParquetReaderFactory(new Options(), paimonRowType, 500, 
FilterCompat.NOOP);
+        AtomicInteger count = new AtomicInteger(0);
+        try (RecordReader<InternalRow> reader =
+                format.createReader(
+                        new FormatReaderContext(
+                                new LocalFileIO(), path, new 
LocalFileIO().getFileSize(path)))) {
+            reader.forEachRemaining(
+                    row -> {
+                        long nanos = nanosValues[count.get()];
+                        assertThat(row.getTimestamp(0, 
9)).isEqualTo(timestampFromNanos(nanos));
+                        assertThat(row.getArray(1).getTimestamp(0, 9))
+                                .isEqualTo(timestampFromNanos(nanos));
+                        assertThat(row.getArray(1).getTimestamp(1, 9))
+                                .isEqualTo(timestampFromNanos(nanos + 1));
+                        count.incrementAndGet();
+                    });
+        }
+        assertThat(count.get()).isEqualTo(nanosValues.length);
+    }
+
     private void innerTestTypes(File folder, List<Integer> records, int 
rowGroupSize)
             throws IOException {
         List<InternalRow> rows = 
records.stream().map(this::newRow).collect(Collectors.toList());
@@ -865,6 +936,11 @@ public class ParquetReadWriteTest {
                 new GenericMap(f34));
     }
 
+    private Timestamp timestampFromNanos(long nanos) {
+        return Timestamp.fromEpochMillis(
+                Math.floorDiv(nanos, 1_000_000L), (int) Math.floorMod(nanos, 
1_000_000L));
+    }
+
     private Timestamp toMills(Integer v) {
         return Timestamp.fromEpochMillis(
                 Timestamp.fromLocalDateTime(toDateTime(v)).getMillisecond());
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java
index bfbdaed7c4..2808cc535a 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java
@@ -24,7 +24,10 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
 
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -32,6 +35,7 @@ import java.util.Arrays;
 import static 
org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToPaimonRowType;
 import static 
org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType;
 import static org.apache.paimon.types.DataTypesTest.assertThat;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 
 /** Test for {@link ParquetSchemaConverter}. */
 public class ParquetSchemaConverterTest {
@@ -100,6 +104,37 @@ public class ParquetSchemaConverterTest {
                                                                                
                     .STRING())))
                                                                     
.notNull()))))));
 
+    @Test
+    public void testParquetTimestampNanosSchemaConvert() {
+        MessageType messageType =
+                new MessageType(
+                        "origin-parquet",
+                        Types.primitive(INT64, Type.Repetition.OPTIONAL)
+                                .as(
+                                        LogicalTypeAnnotation.timestampType(
+                                                false, 
LogicalTypeAnnotation.TimeUnit.NANOS))
+                                .named("timestamp_nanos")
+                                .withId(0),
+                        Types.primitive(INT64, Type.Repetition.OPTIONAL)
+                                .as(
+                                        LogicalTypeAnnotation.timestampType(
+                                                true, 
LogicalTypeAnnotation.TimeUnit.NANOS))
+                                .named("timestamp_ltz_nanos")
+                                .withId(1));
+
+        RowType rowType = convertToPaimonRowType(messageType);
+
+        assertThat(
+                        new RowType(
+                                Arrays.asList(
+                                        new DataField(0, "timestamp_nanos", 
DataTypes.TIMESTAMP(9)),
+                                        new DataField(
+                                                1,
+                                                "timestamp_ltz_nanos",
+                                                
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)))))
+                .isEqualTo(rowType);
+    }
+
     @Test
     public void testPaimonParquetSchemaConvert() {
         MessageType messageType = convertToParquetMessageType(ALL_TYPES);

Reply via email to