This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 908a4e6be2 [parquet] Fix parquet decimal type match. (#5001)
908a4e6be2 is described below

commit 908a4e6be2b7885de00e76c2cc6dff00220e662c
Author: YeJunHao <[email protected]>
AuthorDate: Mon Feb 10 17:50:00 2025 +0800

    [parquet] Fix parquet decimal type match. (#5001)
---
 .../newreader/ParquetVectorUpdaterFactory.java     | 115 +++++++++----------
 .../parquet/newreader/VectorizedColumnReader.java  |  11 +-
 .../format/parquet/ParquetReadWriteTest.java       | 127 +++++++++++++++++++++
 3 files changed, 186 insertions(+), 67 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
index f59f691f8f..465d8824d4 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.parquet.newreader;
 
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.data.columnar.heap.HeapLongVector;
 import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
 import org.apache.paimon.data.columnar.writable.WritableByteVector;
 import org.apache.paimon.data.columnar.writable.WritableBytesVector;
@@ -64,7 +65,6 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.concurrent.TimeUnit;
@@ -79,13 +79,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Updater Factory to get {@link ParquetVectorUpdater}. */
 public class ParquetVectorUpdaterFactory {
 
-    private final LogicalTypeAnnotation logicalTypeAnnotation;
-
-    ParquetVectorUpdaterFactory(LogicalTypeAnnotation logicalTypeAnnotation) {
-        this.logicalTypeAnnotation = logicalTypeAnnotation;
-    }
-
-    public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, 
DataType paimonType) {
+    public static ParquetVectorUpdater getUpdater(
+            ColumnDescriptor descriptor, DataType paimonType) {
         return 
paimonType.accept(UpdaterFactoryVisitor.INSTANCE).apply(descriptor);
     }
 
@@ -144,14 +139,7 @@ public class ParquetVectorUpdaterFactory {
                     case BINARY:
                         return new BinaryToDecimalUpdater(c, decimalType);
                     case FIXED_LEN_BYTE_ARRAY:
-                        int precision = decimalType.getPrecision();
-                        if (ParquetSchemaConverter.is32BitDecimal(precision)) {
-                            return new IntegerToDecimalUpdater(c, decimalType);
-                        } else if 
(ParquetSchemaConverter.is64BitDecimal(precision)) {
-                            return new LongToDecimalUpdater(c, decimalType);
-                        } else {
-                            return new FixedLenByteArrayToDecimalUpdater(c, 
decimalType);
-                        }
+                        return new FixedLenByteArrayToDecimalUpdater(c, 
decimalType);
                 }
                 throw new RuntimeException(
                         "Unsupported decimal type: " + 
c.getPrimitiveType().getPrimitiveTypeName());
@@ -614,10 +602,10 @@ public class ParquetVectorUpdaterFactory {
     private abstract static class DecimalUpdater<T extends 
WritableColumnVector>
             implements ParquetVectorUpdater<T> {
 
-        private final DecimalType sparkType;
+        protected final DecimalType paimonType;
 
-        DecimalUpdater(DecimalType sparkType) {
-            this.sparkType = sparkType;
+        DecimalUpdater(DecimalType paimonType) {
+            this.paimonType = paimonType;
         }
 
         @Override
@@ -627,22 +615,6 @@ public class ParquetVectorUpdaterFactory {
                 readValue(offset + i, values, valuesReader);
             }
         }
-
-        protected void writeDecimal(int offset, WritableColumnVector values, 
BigDecimal decimal) {
-            BigDecimal scaledDecimal =
-                    decimal.setScale(sparkType.getScale(), 
RoundingMode.UNNECESSARY);
-            int precision = decimal.precision();
-            if (ParquetSchemaConverter.is32BitDecimal(precision)) {
-                ((WritableIntVector) values)
-                        .setInt(offset, 
scaledDecimal.unscaledValue().intValue());
-            } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
-                ((WritableLongVector) values)
-                        .setLong(offset, 
scaledDecimal.unscaledValue().longValue());
-            } else {
-                byte[] bytes = scaledDecimal.unscaledValue().toByteArray();
-                ((WritableBytesVector) values).putByteArray(offset, bytes, 0, 
bytes.length);
-            }
-        }
     }
 
     private static class IntegerToDecimalUpdater extends 
DecimalUpdater<WritableIntVector> {
@@ -687,8 +659,8 @@ public class ParquetVectorUpdaterFactory {
     private static class LongToDecimalUpdater extends 
DecimalUpdater<WritableLongVector> {
         private final int parquetScale;
 
-        LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
sparkType) {
-            super(sparkType);
+        LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
paimonType) {
+            super(paimonType);
             LogicalTypeAnnotation typeAnnotation =
                     descriptor.getPrimitiveType().getLogicalTypeAnnotation();
             if (typeAnnotation instanceof DecimalLogicalTypeAnnotation) {
@@ -726,8 +698,8 @@ public class ParquetVectorUpdaterFactory {
     private static class BinaryToDecimalUpdater extends 
DecimalUpdater<WritableBytesVector> {
         private final int parquetScale;
 
-        BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
sparkType) {
-            super(sparkType);
+        BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
paimonType) {
+            super(paimonType);
             LogicalTypeAnnotation typeAnnotation =
                     descriptor.getPrimitiveType().getLogicalTypeAnnotation();
             this.parquetScale = ((DecimalLogicalTypeAnnotation) 
typeAnnotation).getScale();
@@ -766,15 +738,17 @@ public class ParquetVectorUpdaterFactory {
     }
 
     private static class FixedLenByteArrayToDecimalUpdater
-            extends DecimalUpdater<WritableBytesVector> {
-        private final int parquetScale;
+            extends DecimalUpdater<WritableColumnVector> {
         private final int arrayLen;
 
-        FixedLenByteArrayToDecimalUpdater(ColumnDescriptor descriptor, 
DecimalType sparkType) {
-            super(sparkType);
+        FixedLenByteArrayToDecimalUpdater(ColumnDescriptor descriptor, 
DecimalType paimonType) {
+            super(paimonType);
             LogicalTypeAnnotation typeAnnotation =
                     descriptor.getPrimitiveType().getLogicalTypeAnnotation();
-            this.parquetScale = ((DecimalLogicalTypeAnnotation) 
typeAnnotation).getScale();
+            int parquetScale = ((DecimalLogicalTypeAnnotation) 
typeAnnotation).getScale();
+            checkArgument(
+                    parquetScale == paimonType.getScale(),
+                    "Scale should be match between paimon decimal type and 
parquet decimal type in file");
             this.arrayLen = descriptor.getPrimitiveType().getTypeLength();
         }
 
@@ -785,27 +759,52 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableBytesVector values, VectorizedValuesReader 
valuesReader) {
-            BigInteger value = new 
BigInteger(valuesReader.readBinary(arrayLen).getBytesUnsafe());
-            BigDecimal decimal = new BigDecimal(value, this.parquetScale);
-            byte[] bytes = decimal.unscaledValue().toByteArray();
-            values.putByteArray(offset, bytes, 0, bytes.length);
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
+            Binary binary = valuesReader.readBinary(arrayLen);
+
+            int precision = paimonType.getPrecision();
+            if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+                ((HeapIntVector) values).setInt(offset, (int) 
heapBinaryToLong(binary));
+            } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+                ((HeapLongVector) values).setLong(offset, 
heapBinaryToLong(binary));
+            } else {
+                byte[] bytes = binary.getBytesUnsafe();
+                ((WritableBytesVector) values).putByteArray(offset, bytes, 0, 
bytes.length);
+            }
+        }
+
+        private long heapBinaryToLong(Binary binary) {
+            ByteBuffer buffer = binary.toByteBuffer();
+            byte[] bytes = buffer.array();
+            int start = buffer.arrayOffset() + buffer.position();
+            int end = buffer.arrayOffset() + buffer.limit();
+
+            long unscaled = 0L;
+
+            for (int i = start; i < end; i++) {
+                unscaled = (unscaled << 8) | (bytes[i] & 0xff);
+            }
+
+            int bits = 8 * (end - start);
+            return (unscaled << (64 - bits)) >> (64 - bits);
         }
 
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableBytesVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
-            BigInteger value =
-                    new BigInteger(
-                            dictionary
-                                    
.decodeToBinary(dictionaryIds.getInt(offset))
-                                    .getBytesUnsafe());
-            BigDecimal decimal = new BigDecimal(value, this.parquetScale);
-            byte[] bytes = decimal.unscaledValue().toByteArray();
-            values.putByteArray(offset, bytes, 0, bytes.length);
+            Binary binary = 
dictionary.decodeToBinary(dictionaryIds.getInt(offset));
+            int precision = paimonType.getPrecision();
+            if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+                ((HeapIntVector) values).setInt(offset, (int) 
heapBinaryToLong(binary));
+            } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+                ((HeapLongVector) values).setLong(offset, 
heapBinaryToLong(binary));
+            } else {
+                byte[] bytes = binary.getBytesUnsafe();
+                ((WritableBytesVector) values).putByteArray(offset, bytes, 0, 
bytes.length);
+            }
         }
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
index 277cd533c5..166a5ce935 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
@@ -39,7 +39,6 @@ import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.values.RequiresPreviousReader;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -67,9 +66,6 @@ public class VectorizedColumnReader {
     /** Vectorized RLE decoder for repetition levels. */
     private VectorizedRleValuesReader repColumn;
 
-    /** Factory to get type-specific vector updater. */
-    private final ParquetVectorUpdaterFactory updaterFactory;
-
     /**
      * Helper struct to track intermediate states while reading Parquet pages 
in the column chunk.
      */
@@ -83,7 +79,6 @@ public class VectorizedColumnReader {
 
     private final PageReader pageReader;
     private final ColumnDescriptor descriptor;
-    private final LogicalTypeAnnotation logicalTypeAnnotation;
     private final ParsedVersion writerVersion;
 
     public VectorizedColumnReader(
@@ -97,8 +92,6 @@ public class VectorizedColumnReader {
         this.readState =
                 new ParquetReadState(
                         descriptor, isRequired, 
pageReadStore.getRowIndexes().orElse(null));
-        this.logicalTypeAnnotation = 
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
-        this.updaterFactory = new 
ParquetVectorUpdaterFactory(logicalTypeAnnotation);
 
         DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
         if (dictionaryPage != null) {
@@ -120,7 +113,7 @@ public class VectorizedColumnReader {
     }
 
     private boolean isLazyDecodingSupported(
-            PrimitiveType.PrimitiveTypeName typeName, DataType sparkType) {
+            PrimitiveType.PrimitiveTypeName typeName, DataType paimonType) {
         return true;
     }
 
@@ -133,7 +126,7 @@ public class VectorizedColumnReader {
             WritableIntVector definitionLevels)
             throws IOException {
         WritableIntVector dictionaryIds = null;
-        ParquetVectorUpdater updater = updaterFactory.getUpdater(descriptor, 
type);
+        ParquetVectorUpdater updater = 
ParquetVectorUpdaterFactory.getUpdater(descriptor, type);
 
         if (dictionary != null) {
             // SPARK-16334: We only maintain a single dictionary per row 
batch, so that it can be
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index c1028082a2..58fc10cc51 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -83,6 +83,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -99,6 +100,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
+import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -156,6 +159,16 @@ public class ParquetReadWriteTest {
                                     new TimestampType(6), new 
VarCharType(VarCharType.MAX_LENGTH)))
                     .build();
 
+    private static final RowType DECIMAL_TYPE =
+            RowType.builder()
+                    .fields(
+                            new DecimalType(3, 2),
+                            new DecimalType(6, 2),
+                            new DecimalType(9, 2),
+                            new DecimalType(12, 2),
+                            new DecimalType(32, 2))
+                    .build();
+
     private static final RowType NESTED_ARRAY_MAP_TYPE =
             RowType.of(
                     new IntType(),
@@ -478,6 +491,36 @@ public class ParquetReadWriteTest {
         compareNestedRow(rows, results);
     }
 
+    @Test
+    public void testDecimalWithFixedLengthRead() throws Exception {
+        int number = new Random().nextInt(1000) + 100;
+        Path path = createDecimalFile(number, folder, 10);
+
+        ParquetReaderFactory format =
+                new ParquetReaderFactory(new Options(), DECIMAL_TYPE, 500, 
FilterCompat.NOOP);
+        RecordReader<InternalRow> reader =
+                format.createReader(
+                        new FormatReaderContext(
+                                new LocalFileIO(), path, new 
LocalFileIO().getFileSize(path)));
+        List<InternalRow> results = new ArrayList<>(number);
+        InternalRowSerializer internalRowSerializer = new 
InternalRowSerializer(DECIMAL_TYPE);
+        reader.forEachRemaining(row -> 
results.add(internalRowSerializer.copy(row)));
+
+        BigDecimal decimalValue0 = new BigDecimal("123.67");
+        BigDecimal decimalValue1 = new BigDecimal("12345.67");
+        BigDecimal decimalValue2 = new BigDecimal("1234567.67");
+        BigDecimal decimalValue3 = new BigDecimal("123456789123.67");
+        BigDecimal decimalValue4 = new 
BigDecimal("123456789123456789123456789123.67");
+
+        for (InternalRow internalRow : results) {
+            assertThat(internalRow.getDecimal(0, 3, 
2).toBigDecimal()).isEqualTo(decimalValue0);
+            assertThat(internalRow.getDecimal(1, 6, 
2).toBigDecimal()).isEqualTo(decimalValue1);
+            assertThat(internalRow.getDecimal(2, 9, 
2).toBigDecimal()).isEqualTo(decimalValue2);
+            assertThat(internalRow.getDecimal(3, 12, 
2).toBigDecimal()).isEqualTo(decimalValue3);
+            assertThat(internalRow.getDecimal(4, 32, 
2).toBigDecimal()).isEqualTo(decimalValue4);
+        }
+    }
+
     @Test
     public void testNestedNullMapKey() {
         List<InternalRow> rows = prepareNestedData(1283, true);
@@ -968,6 +1011,90 @@ public class ParquetReadWriteTest {
         return path;
     }
 
+    private Path createDecimalFile(int rowNum, File tmpDir, int rowGroupSize) {
+        Path path = new Path(tmpDir.getPath(), UUID.randomUUID().toString());
+        Configuration conf = new Configuration();
+        conf.setInt("parquet.block.size", rowGroupSize);
+        List<Type> types = new ArrayList<>();
+
+        for (DataField dataField : DECIMAL_TYPE.getFields()) {
+            String name = dataField.name();
+            int fieldId = dataField.id();
+            int precision = ((DecimalType) dataField.type()).getPrecision();
+            int scale = ((DecimalType) dataField.type()).getScale();
+            Type.Repetition repetition =
+                    dataField.type().isNullable()
+                            ? Type.Repetition.OPTIONAL
+                            : Type.Repetition.REQUIRED;
+
+            types.add(
+                    Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+                            .as(LogicalTypeAnnotation.decimalType(scale, 
precision))
+                            
.length(computeMinBytesForDecimalPrecision(precision))
+                            .named(name)
+                            .withId(fieldId));
+        }
+
+        MessageType schema = new MessageType("paimon_schema", types);
+
+        List<Binary> decimalBytesList = new ArrayList<>();
+
+        BigDecimal decimalValue = new BigDecimal("123.67");
+        int scale = 2;
+        byte[] decimalBytes =
+                decimalValue.setScale(scale, 
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+        Binary binaryValue = Binary.fromByteArray(decimalBytes);
+        decimalBytesList.add(binaryValue);
+
+        decimalValue = new BigDecimal("12345.67");
+        decimalBytes =
+                decimalValue.setScale(scale, 
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+        binaryValue = Binary.fromByteArray(decimalBytes);
+        decimalBytesList.add(binaryValue);
+
+        decimalValue = new BigDecimal("1234567.67");
+        decimalBytes =
+                decimalValue.setScale(scale, 
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+        binaryValue = Binary.fromByteArray(decimalBytes);
+        decimalBytesList.add(binaryValue);
+
+        decimalValue = new BigDecimal("123456789123.67");
+        decimalBytes =
+                decimalValue.setScale(scale, 
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+        binaryValue = Binary.fromByteArray(decimalBytes);
+        decimalBytesList.add(binaryValue);
+
+        decimalValue = new BigDecimal("123456789123456789123456789123.67");
+        decimalBytes =
+                decimalValue.setScale(scale, 
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+        binaryValue = Binary.fromByteArray(decimalBytes);
+        decimalBytesList.add(binaryValue);
+
+        try (ParquetWriter<Group> writer =
+                ExampleParquetWriter.builder(
+                                HadoopOutputFile.fromPath(
+                                        new 
org.apache.hadoop.fs.Path(path.toString()), conf))
+                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                        .withConf(new Configuration())
+                        .withType(schema)
+                        .build()) {
+            SimpleGroupFactory simpleGroupFactory = new 
SimpleGroupFactory(schema);
+            for (int i = 0; i < rowNum; i++) {
+
+                Group row = simpleGroupFactory.newGroup();
+
+                for (int j = 0; j < DECIMAL_TYPE.getFields().size(); j++) {
+                    row.append("f" + j, decimalBytesList.get(j));
+                }
+
+                writer.write(row);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Create data by parquet origin writer 
failed.", e);
+        }
+        return path;
+    }
+
     private void createParquetDoubleNestedArray(Group group, int i) {
         Group outside = group.addGroup(0);
         Group inside = outside.addGroup(0);

Reply via email to