This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 403c6b705c28825d570a487b64ff9c6eb4374a64 Author: YeJunHao <[email protected]> AuthorDate: Fri May 9 16:17:55 2025 +0800 [parquet] Fix timestamp type and decimal type, if the file schema is not correctly match the schema in metadata (#5582) --- .../newreader/ParquetVectorUpdaterFactory.java | 165 ++++++++++++++++----- .../parquet/newreader/VectorizedColumnReader.java | 34 ++++- .../parquet/reader/ParquetSplitReaderUtil.java | 26 ---- 3 files changed, 162 insertions(+), 63 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java index 465d8824d4..e420c450bc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.parquet.newreader; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.columnar.heap.HeapBytesVector; import org.apache.paimon.data.columnar.heap.HeapIntVector; import org.apache.paimon.data.columnar.heap.HeapLongVector; import org.apache.paimon.data.columnar.writable.WritableBooleanVector; @@ -191,9 +192,14 @@ public class ParquetVectorUpdaterFactory { return c -> { if (c.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) { - return new LongUpdater(); + return new LongTimestampUpdater(timestampType.getPrecision()); + } else if (c.getPrimitiveType().getPrimitiveTypeName() + == PrimitiveType.PrimitiveTypeName.INT96) { + return new TimestampUpdater(timestampType.getPrecision()); + } else { + throw new UnsupportedOperationException( + "Only support timestamp with int64 and int96 in parquet file yet"); } - return new TimestampUpdater(); }; } @@ -204,7 +210,7 @@ public class ParquetVectorUpdaterFactory { == PrimitiveType.PrimitiveTypeName.INT64) { return new LongUpdater(); } - return new TimestampUpdater(); + return new TimestampUpdater(localZonedTimestampType.getPrecision()); }; } @@ -391,25 +397,86 @@ public class ParquetVectorUpdaterFactory { } } - private static class TimestampUpdater implements ParquetVectorUpdater<WritableTimestampVector> { + private abstract static class AbstractTimestampUpdater + implements ParquetVectorUpdater<WritableColumnVector> { + + protected final int precision; + + AbstractTimestampUpdater(int precision) { + this.precision = precision; + } + + @Override + public void readValues( + int total, + int offset, + WritableColumnVector values, + VectorizedValuesReader valuesReader) { + for (int i = 0; i < total; i++) { + readValue(offset + i, values, valuesReader); + } + } + } + + private static class LongTimestampUpdater extends AbstractTimestampUpdater { + + public LongTimestampUpdater(int precision) { + super(precision); + } + + @Override + public void skipValues(int total, VectorizedValuesReader valuesReader) { + valuesReader.skipLongs(total); + } + + @Override + public void readValue( + int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { + long value = valuesReader.readLong(); + putTimestamp(values, offset, value); + } + + @Override + public void decodeSingleDictionaryId( + int offset, + WritableColumnVector values, + WritableIntVector dictionaryIds, + Dictionary dictionary) { + long value = dictionary.decodeToLong(dictionaryIds.getInt(offset)); + putTimestamp(values, offset, value); + } + + private void putTimestamp(WritableColumnVector vector, int offset, long timestamp) { + if (vector instanceof WritableTimestampVector) { + ((WritableTimestampVector) vector) + .setTimestamp(offset, Timestamp.fromEpochMillis(timestamp)); + } else { + ((WritableLongVector) vector).setLong(offset, timestamp); + } + } + } + + private static class TimestampUpdater extends AbstractTimestampUpdater { public static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588; public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); public static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); public static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); + public TimestampUpdater(int precision) { + super(precision); + } + @Override public void readValues( int total, int offset, - WritableTimestampVector values, + WritableColumnVector values, VectorizedValuesReader valuesReader) { - for (int i = 0; i < total; i++) { - values.setTimestamp( - offset + i, - int96ToTimestamp( - true, valuesReader.readLong(), valuesReader.readInteger())); + Timestamp timestamp = + int96ToTimestamp(true, valuesReader.readLong(), valuesReader.readInteger()); + putTimestamp(values, offset + i, timestamp); } } @@ -420,8 +487,9 @@ public class ParquetVectorUpdaterFactory { @Override public void readValue( - int offset, WritableTimestampVector values, VectorizedValuesReader valuesReader) { - values.setTimestamp( + int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { + putTimestamp( + values, offset, int96ToTimestamp(true, valuesReader.readLong(), valuesReader.readInteger())); } @@ -429,11 +497,28 @@ public class ParquetVectorUpdaterFactory { @Override public void decodeSingleDictionaryId( int offset, - WritableTimestampVector values, + WritableColumnVector values, WritableIntVector dictionaryIds, Dictionary dictionary) { - values.setTimestamp( - offset, decodeInt96ToTimestamp(true, dictionary, dictionaryIds.getInt(offset))); + putTimestamp( + values, + offset, + decodeInt96ToTimestamp(true, dictionary, dictionaryIds.getInt(offset))); + } + + private void putTimestamp(WritableColumnVector vector, int offset, Timestamp timestamp) { + if (vector instanceof WritableTimestampVector) { + ((WritableTimestampVector) vector).setTimestamp(offset, timestamp); + } else { + if (precision <= 3) { + ((WritableLongVector) vector).setLong(offset, timestamp.getMillisecond()); + } else if (precision <= 6) { + ((WritableLongVector) vector).setLong(offset, timestamp.toMicros()); + } else { + throw new UnsupportedOperationException( + "Unsupported timestamp precision: " + precision); + } + } } public static Timestamp decodeInt96ToTimestamp( @@ -615,9 +700,21 @@ public class ParquetVectorUpdaterFactory { readValue(offset + i, values, valuesReader); } } + + protected void putDecimal(WritableColumnVector values, int offset, BigDecimal decimal) { + int precision = paimonType.getPrecision(); + if (ParquetSchemaConverter.is32BitDecimal(precision)) { + ((HeapIntVector) values).setInt(offset, decimal.unscaledValue().intValue()); + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { + ((HeapLongVector) values).setLong(offset, decimal.unscaledValue().longValue()); + } else { + byte[] bytes = decimal.unscaledValue().toByteArray(); + ((WritableBytesVector) values).putByteArray(offset, bytes, 0, bytes.length); + } + } } - private static class IntegerToDecimalUpdater extends DecimalUpdater<WritableIntVector> { + private static class IntegerToDecimalUpdater extends DecimalUpdater<WritableColumnVector> { private final int parquetScale; IntegerToDecimalUpdater(ColumnDescriptor descriptor, DecimalType paimonType) { @@ -638,25 +735,25 @@ public class ParquetVectorUpdaterFactory { @Override public void readValue( - int offset, WritableIntVector values, VectorizedValuesReader valuesReader) { + int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { BigDecimal decimal = BigDecimal.valueOf(valuesReader.readInteger(), parquetScale); - values.setInt(offset, decimal.unscaledValue().intValue()); + putDecimal(values, offset, decimal); } @Override public void decodeSingleDictionaryId( int offset, - WritableIntVector values, + WritableColumnVector values, WritableIntVector dictionaryIds, Dictionary dictionary) { BigDecimal decimal = BigDecimal.valueOf( dictionary.decodeToInt(dictionaryIds.getInt(offset)), parquetScale); - values.setInt(offset, decimal.unscaledValue().intValue()); + putDecimal(values, offset, decimal); } } - private static class LongToDecimalUpdater extends DecimalUpdater<WritableLongVector> { + private static class LongToDecimalUpdater extends DecimalUpdater<WritableColumnVector> { private final int parquetScale; LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType paimonType) { @@ -677,32 +774,34 @@ public class ParquetVectorUpdaterFactory { @Override public void readValue( - int offset, WritableLongVector values, VectorizedValuesReader valuesReader) { + int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { BigDecimal decimal = BigDecimal.valueOf(valuesReader.readLong(), parquetScale); - values.setLong(offset, decimal.unscaledValue().longValue()); + putDecimal(values, offset, decimal); } @Override public void decodeSingleDictionaryId( int offset, - WritableLongVector values, + WritableColumnVector values, WritableIntVector dictionaryIds, Dictionary dictionary) { BigDecimal decimal = BigDecimal.valueOf( dictionary.decodeToLong(dictionaryIds.getInt(offset)), parquetScale); - values.setLong(offset, decimal.unscaledValue().longValue()); + putDecimal(values, offset, decimal); } } - private static class BinaryToDecimalUpdater extends DecimalUpdater<WritableBytesVector> { + private static class BinaryToDecimalUpdater extends DecimalUpdater<WritableColumnVector> { private final int parquetScale; + private final WritableBytesVector bytesVector; BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType paimonType) { super(paimonType); LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); this.parquetScale = ((DecimalLogicalTypeAnnotation) typeAnnotation).getScale(); + this.bytesVector = new HeapBytesVector(1); } @Override @@ -712,18 +811,17 @@ public class ParquetVectorUpdaterFactory { @Override public void readValue( - int offset, WritableBytesVector values, VectorizedValuesReader valuesReader) { - valuesReader.readBinary(1, values, offset); - BigInteger value = new BigInteger(values.getBytes(offset).getBytes()); + int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { + valuesReader.readBinary(1, bytesVector, offset); + BigInteger value = new BigInteger(bytesVector.getBytes(offset).getBytes()); BigDecimal decimal = new BigDecimal(value, parquetScale); - byte[] bytes = decimal.unscaledValue().toByteArray(); - values.putByteArray(offset, bytes, 0, bytes.length); + putDecimal(values, offset, decimal); } @Override public void decodeSingleDictionaryId( int offset, - WritableBytesVector values, + WritableColumnVector values, WritableIntVector dictionaryIds, Dictionary dictionary) { BigInteger value = @@ -732,8 +830,7 @@ public class ParquetVectorUpdaterFactory { .decodeToBinary(dictionaryIds.getInt(offset)) .getBytesUnsafe()); BigDecimal decimal = new BigDecimal(value, parquetScale); - byte[] bytes = decimal.unscaledValue().toByteArray(); - values.putByteArray(offset, bytes, 0, bytes.length); + putDecimal(values, offset, decimal); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java index 166a5ce935..c12b58b4e7 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java @@ -18,6 +18,13 @@ package org.apache.paimon.format.parquet.newreader; +import org.apache.paimon.data.columnar.BooleanColumnVector; +import org.apache.paimon.data.columnar.BytesColumnVector; +import org.apache.paimon.data.columnar.ColumnVector; +import org.apache.paimon.data.columnar.DoubleColumnVector; +import org.apache.paimon.data.columnar.FloatColumnVector; +import org.apache.paimon.data.columnar.IntColumnVector; +import org.apache.paimon.data.columnar.LongColumnVector; import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.paimon.format.parquet.reader.ParquetDictionary; @@ -43,6 +50,7 @@ import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; +import static org.apache.paimon.types.DataTypeRoot.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; /* This file is based on source code from the Spark Project (http://spark.apache.org/), licensed by the Apache @@ -113,8 +121,28 @@ public class VectorizedColumnReader { } private boolean isLazyDecodingSupported( - PrimitiveType.PrimitiveTypeName typeName, DataType paimonType) { - return true; + PrimitiveType.PrimitiveTypeName typeName, ColumnVector columnVector) { + boolean isSupported = false; + switch (typeName) { + case INT32: + isSupported = columnVector instanceof IntColumnVector; + break; + case INT64: + isSupported = columnVector instanceof LongColumnVector; + break; + case FLOAT: + isSupported = columnVector instanceof FloatColumnVector; + break; + case DOUBLE: + isSupported = columnVector instanceof DoubleColumnVector; + break; + case BOOLEAN: + isSupported = columnVector instanceof BooleanColumnVector; + break; + case BINARY: + isSupported = columnVector instanceof BytesColumnVector; + } + return isSupported; } /** Reads `total` rows from this columnReader into column. */ @@ -181,7 +209,7 @@ public class VectorizedColumnReader { // the values to add microseconds precision. if (column.hasDictionary() || (startRowId == pageFirstRowIndex - && isLazyDecodingSupported(typeName, type))) { + && isLazyDecodingSupported(typeName, column))) { column.setDictionary(new ParquetDictionary(dictionary)); } else { updater.decodeDictionaryIds( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java index ece0ff9a7f..c8d149a8b6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java @@ -61,7 +61,6 @@ import org.apache.parquet.io.PrimitiveColumnIO; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -225,10 +224,6 @@ public class ParquetSplitReaderUtil { case TIMESTAMP_WITH_LOCAL_TIME_ZONE: int precision = DataTypeChecks.getPrecision(fieldType); if (precision > 6) { - checkArgument( - typeName == PrimitiveType.PrimitiveTypeName.INT96, - "Unexpected type: %s", - typeName); return new HeapTimestampVector(batchSize); } else { return new HeapLongVector(batchSize); @@ -236,31 +231,10 @@ public class ParquetSplitReaderUtil { case DECIMAL: DecimalType decimalType = (DecimalType) fieldType; if (ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) { - checkArgument( - (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - || typeName == PrimitiveType.PrimitiveTypeName.INT32) - && primitiveType.getLogicalTypeAnnotation() - instanceof DecimalLogicalTypeAnnotation, - "Unexpected type: %s", - typeName); return new HeapIntVector(batchSize); } else if (ParquetSchemaConverter.is64BitDecimal(decimalType.getPrecision())) { - checkArgument( - (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - || typeName == PrimitiveType.PrimitiveTypeName.INT64) - && primitiveType.getLogicalTypeAnnotation() - instanceof DecimalLogicalTypeAnnotation, - "Unexpected type: %s", - typeName); return new HeapLongVector(batchSize); } else { - checkArgument( - (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - || typeName == PrimitiveType.PrimitiveTypeName.BINARY) - && primitiveType.getLogicalTypeAnnotation() - instanceof DecimalLogicalTypeAnnotation, - "Unexpected type: %s", - typeName); return new HeapBytesVector(batchSize); } case ARRAY:
