This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b97425a9bb [format] Support reading Parquet TIMESTAMP(NANOS) as
timestamp(9) (#7845)
b97425a9bb is described below
commit b97425a9bb1cc2fd6dbb8e18462688d973dc2aa7
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 14 00:08:02 2026 +0800
[format] Support reading Parquet TIMESTAMP(NANOS) as timestamp(9) (#7845)
Native Parquet writers such as Arrow can encode `TIMESTAMP(9)` as
`INT64` with the `TIMESTAMP(NANOS)` logical annotation. This is valid
Parquet, but Paimon's vectorized Parquet reader did not use the logical
timestamp unit when decoding `INT64` timestamp columns, so nanosecond
timestamps could not be read as Paimon `timestamp(9)` correctly.
---
.../format/parquet/ParquetSchemaConverter.java | 16 ++--
.../reader/ParquetVectorUpdaterFactory.java | 93 ++++++++++++++++++++--
.../format/parquet/ParquetReadWriteTest.java | 76 ++++++++++++++++++
.../format/parquet/ParquetSchemaConverterTest.java | 35 ++++++++
4 files changed, 209 insertions(+), 11 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index 37a69fe9ae..640081cd50 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -353,12 +353,16 @@ public class ParquetSchemaConverter {
instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
timestampType =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
- int precision =
- timestampType
- .getUnit()
-
.equals(LogicalTypeAnnotation.TimeUnit.MILLIS)
- ? 3
- : 6;
+ int precision;
+ if
(timestampType.getUnit().equals(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+ precision = 3;
+ } else if (timestampType
+ .getUnit()
+
.equals(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+ precision = 6;
+ } else {
+ precision = 9;
+ }
paimonDataType =
timestampType.isAdjustedToUTC()
? new
LocalZonedTimestampType(precision)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
index 0abf78fd27..fdf3ecdc30 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
@@ -190,7 +190,9 @@ public class ParquetVectorUpdaterFactory {
return c -> {
if (c.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT64) {
- return new
LongTimestampUpdater(timestampType.getPrecision());
+ return new LongTimestampUpdater(
+ timestampType.getPrecision(),
+ timestampUnit(c, timestampType.getPrecision()));
} else if (c.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT96) {
return new TimestampUpdater(timestampType.getPrecision());
@@ -206,12 +208,32 @@ public class ParquetVectorUpdaterFactory {
return c -> {
if (c.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT64) {
- return new LongUpdater();
+ return new LongTimestampUpdater(
+ localZonedTimestampType.getPrecision(),
+ timestampUnit(c,
localZonedTimestampType.getPrecision()));
}
return new
TimestampUpdater(localZonedTimestampType.getPrecision());
};
}
+ private static LogicalTypeAnnotation.TimeUnit timestampUnit(
+ ColumnDescriptor descriptor, int precision) {
+ LogicalTypeAnnotation typeAnnotation =
+ descriptor.getPrimitiveType().getLogicalTypeAnnotation();
+ if (typeAnnotation instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ return ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
typeAnnotation)
+ .getUnit();
+ }
+
+ if (precision <= 3) {
+ return LogicalTypeAnnotation.TimeUnit.MILLIS;
+ } else if (precision <= 6) {
+ return LogicalTypeAnnotation.TimeUnit.MICROS;
+ } else {
+ return LogicalTypeAnnotation.TimeUnit.MILLIS;
+ }
+ }
+
@Override
public UpdaterFactory visit(VariantType variantType) {
throw new RuntimeException("Variant type is not supported");
@@ -436,8 +458,11 @@ public class ParquetVectorUpdaterFactory {
private static class LongTimestampUpdater extends AbstractTimestampUpdater
{
- public LongTimestampUpdater(int precision) {
+ private final LogicalTypeAnnotation.TimeUnit timeUnit;
+
+ public LongTimestampUpdater(int precision,
LogicalTypeAnnotation.TimeUnit timeUnit) {
super(precision);
+ this.timeUnit = timeUnit;
}
@Override
@@ -465,9 +490,67 @@ public class ParquetVectorUpdaterFactory {
private void putTimestamp(WritableColumnVector vector, int offset,
long timestamp) {
if (vector instanceof WritableTimestampVector) {
((WritableTimestampVector) vector)
- .setTimestamp(offset,
Timestamp.fromEpochMillis(timestamp));
+ .setTimestamp(offset, timestampFromInt64(timestamp,
timeUnit));
+ } else {
+ ((WritableLongVector) vector).setLong(offset,
longTimestamp(timestamp));
+ }
+ }
+
+ private long longTimestamp(long timestamp) {
+ if (precision <= 3) {
+ return millisFromInt64(timestamp, timeUnit);
+ } else if (precision <= 6) {
+ return microsFromInt64(timestamp, timeUnit);
} else {
- ((WritableLongVector) vector).setLong(offset, timestamp);
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp precision: " + precision);
+ }
+ }
+
+ private static Timestamp timestampFromInt64(
+ long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case MILLIS:
+ return Timestamp.fromEpochMillis(timestamp);
+ case MICROS:
+ return Timestamp.fromMicros(timestamp);
+ case NANOS:
+ return Timestamp.fromEpochMillis(
+ Math.floorDiv(timestamp, 1_000_000L),
+ (int) Math.floorMod(timestamp, 1_000_000L));
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp unit: " + timeUnit);
+ }
+ }
+
+ private static long millisFromInt64(
+ long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case MILLIS:
+ return timestamp;
+ case MICROS:
+ return Math.floorDiv(timestamp, 1_000L);
+ case NANOS:
+ return Math.floorDiv(timestamp, 1_000_000L);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp unit: " + timeUnit);
+ }
+ }
+
+ private static long microsFromInt64(
+ long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case MILLIS:
+ return Math.multiplyExact(timestamp, 1_000L);
+ case MICROS:
+ return timestamp;
+ case NANOS:
+ return Math.floorDiv(timestamp, 1_000L);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp unit: " + timeUnit);
}
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 58fc10cc51..5851ef7db5 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -670,6 +670,77 @@ public class ParquetReadWriteTest {
});
}
+ @Test
+ public void testReadTimestampNanosWrittenByParquet() throws Exception {
+ Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
+ Configuration conf = new Configuration();
+ Type timestampNanosType =
+ Types.primitive(INT64, Type.Repetition.REQUIRED)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ false,
LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named("f0")
+ .withId(0);
+ Type arrayTimestampNanosType =
+ ConversionPatterns.listOfElements(
+ Type.Repetition.OPTIONAL,
+ "f1",
+ Types.primitive(INT64,
Type.Repetition.OPTIONAL)
+ .as(
+
LogicalTypeAnnotation.timestampType(
+ false,
+
LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named("element")
+ .withId(2))
+ .withId(1);
+ MessageType schema =
+ new MessageType("origin-parquet", timestampNanosType,
arrayTimestampNanosType);
+ long[] nanosValues = new long[] {1704067200123456789L, -123456789L};
+
+ try (ParquetWriter<Group> writer =
+ ExampleParquetWriter.builder(
+ HadoopOutputFile.fromPath(
+ new
org.apache.hadoop.fs.Path(path.toString()), conf))
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withConf(new Configuration())
+ .withType(schema)
+ .build()) {
+ SimpleGroupFactory simpleGroupFactory = new
SimpleGroupFactory(schema);
+ for (long nanos : nanosValues) {
+ Group row = simpleGroupFactory.newGroup();
+ row.append("f0", nanos);
+ Group array = row.addGroup("f1");
+ array.addGroup(0).add(0, nanos);
+ array.addGroup(0).add(0, nanos + 1);
+ writer.write(row);
+ }
+ }
+
+ RowType paimonRowType =
+ RowType.builder()
+ .fields(new TimestampType(9), new ArrayType(new
TimestampType(9)))
+ .build();
+ ParquetReaderFactory format =
+ new ParquetReaderFactory(new Options(), paimonRowType, 500,
FilterCompat.NOOP);
+ AtomicInteger count = new AtomicInteger(0);
+ try (RecordReader<InternalRow> reader =
+ format.createReader(
+ new FormatReaderContext(
+ new LocalFileIO(), path, new
LocalFileIO().getFileSize(path)))) {
+ reader.forEachRemaining(
+ row -> {
+ long nanos = nanosValues[count.get()];
+ assertThat(row.getTimestamp(0,
9)).isEqualTo(timestampFromNanos(nanos));
+ assertThat(row.getArray(1).getTimestamp(0, 9))
+ .isEqualTo(timestampFromNanos(nanos));
+ assertThat(row.getArray(1).getTimestamp(1, 9))
+ .isEqualTo(timestampFromNanos(nanos + 1));
+ count.incrementAndGet();
+ });
+ }
+ assertThat(count.get()).isEqualTo(nanosValues.length);
+ }
+
private void innerTestTypes(File folder, List<Integer> records, int
rowGroupSize)
throws IOException {
List<InternalRow> rows =
records.stream().map(this::newRow).collect(Collectors.toList());
@@ -865,6 +936,11 @@ public class ParquetReadWriteTest {
new GenericMap(f34));
}
+ private Timestamp timestampFromNanos(long nanos) {
+ return Timestamp.fromEpochMillis(
+ Math.floorDiv(nanos, 1_000_000L), (int) Math.floorMod(nanos,
1_000_000L));
+ }
+
private Timestamp toMills(Integer v) {
return Timestamp.fromEpochMillis(
Timestamp.fromLocalDateTime(toDateTime(v)).getMillisecond());
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java
index bfbdaed7c4..2808cc535a 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java
@@ -24,7 +24,10 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -32,6 +35,7 @@ import java.util.Arrays;
import static
org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToPaimonRowType;
import static
org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType;
import static org.apache.paimon.types.DataTypesTest.assertThat;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
/** Test for {@link ParquetSchemaConverter}. */
public class ParquetSchemaConverterTest {
@@ -100,6 +104,37 @@ public class ParquetSchemaConverterTest {
.STRING())))
.notNull()))))));
+ @Test
+ public void testParquetTimestampNanosSchemaConvert() {
+ MessageType messageType =
+ new MessageType(
+ "origin-parquet",
+ Types.primitive(INT64, Type.Repetition.OPTIONAL)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ false,
LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named("timestamp_nanos")
+ .withId(0),
+ Types.primitive(INT64, Type.Repetition.OPTIONAL)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true,
LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named("timestamp_ltz_nanos")
+ .withId(1));
+
+ RowType rowType = convertToPaimonRowType(messageType);
+
+ assertThat(
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "timestamp_nanos",
DataTypes.TIMESTAMP(9)),
+ new DataField(
+ 1,
+ "timestamp_ltz_nanos",
+
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)))))
+ .isEqualTo(rowType);
+ }
+
@Test
public void testPaimonParquetSchemaConvert() {
MessageType messageType = convertToParquetMessageType(ALL_TYPES);