This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 19dacc1ff ORC-1700: Write parquet decimal type data in Benchmark using 
`FIXED_LEN_BYTE_ARRAY` type
19dacc1ff is described below

commit 19dacc1ffa0802030c28f4fa7c4040e16a2994ee
Author: sychen <[email protected]>
AuthorDate: Tue Jul 9 12:13:19 2024 -0700

    ORC-1700: Write parquet decimal type data in Benchmark using 
`FIXED_LEN_BYTE_ARRAY` type
    
    ### What changes were proposed in this pull request?
    This PR aims to write parquet decimal type data in Benchmark using 
`FIXED_LEN_BYTE_ARRAY` type.
    
    ### Why are the changes needed?
    Because the decimal type of the parquet file generated now corresponds to 
the binary type of parquet, but Spark3.5.1 does not support reading.
    Spark 3.5.1 supports reading if using the `FIXED_LEN_BYTE_ARRAY` type.
    
    main
    ```
      optional binary fare_amount (DECIMAL(8,2));
    ```
    
    PR
    ```
      optional fixed_len_byte_array(5) fare_amount (DECIMAL(10,2));
    ```
    
    ```bash
    java -jar spark/target/orc-benchmarks-spark-2.1.0-SNAPSHOT.jar spark data 
-format=parquet  -compress zstd -data taxi
    ```
    
    ```java
    
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException:
 column: [fare_amount], physicalType: BINARY, logicalType: decimal(8,2)
            at 
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1136)
            at 
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:199)
            at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175)
            at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:342)
            at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:233)
            at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
            at 
org.apache.orc.bench.spark.SparkBenchmark.processReader(SparkBenchmark.java:170)
            at 
org.apache.orc.bench.spark.SparkBenchmark.fullRead(SparkBenchmark.java:216)
            at 
org.apache.orc.bench.spark.jmh_generated.SparkBenchmark_fullRead_jmhTest.fullRead_avgt_jmhStub(SparkBenchmark_fullRead_jmhTest.java:219)
    ```
    
    ### How was this patch tested?
    local test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #1910 from cxzl25/ORC-1700.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit b8481ea9e188794656b883de6dd9e9ab43c675a9)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../orc/bench/core/convert/avro/AvroReader.java    |  3 ++
 .../bench/core/convert/avro/AvroSchemaUtils.java   | 17 +++++++++-
 .../orc/bench/core/convert/avro/AvroWriter.java    | 37 ++++++++++++++++------
 java/bench/core/src/resources/taxi.schema          | 14 ++++----
 4 files changed, 54 insertions(+), 17 deletions(-)

diff --git 
a/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroReader.java
 
b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroReader.java
index 2756ca1dd..8474351f2 100644
--- 
a/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroReader.java
+++ 
b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroReader.java
@@ -204,6 +204,9 @@ public class AvroReader implements BatchReader {
         DecimalColumnVector tc = (DecimalColumnVector) cv;
         if (value instanceof ByteBuffer) {
           tc.vector[row].set(getHiveDecimalFromByteBuffer((ByteBuffer) value, 
scale));
+        } else if (value instanceof GenericData.Fixed) {
+          tc.vector[row].set(getHiveDecimalFromByteBuffer(
+              ByteBuffer.wrap(((GenericData.Fixed) value).bytes()), scale));
         } else {
           tc.vector[row].set(HiveDecimal.create(Math.round((double) value * 
multiplier)));
         }
diff --git 
a/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroSchemaUtils.java
 
b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroSchemaUtils.java
index 96df6b5ba..65753553a 100644
--- 
a/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroSchemaUtils.java
+++ 
b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroSchemaUtils.java
@@ -78,8 +78,11 @@ public class AvroSchemaUtils {
       case DECIMAL:
         String precision = String.valueOf(typeInfo.getPrecision());
         String scale = String.valueOf(typeInfo.getScale());
+        int bytes = PRECISION_TO_BYTE_COUNT[typeInfo.getPrecision() - 1];
         schema = getSchemaFor("{" +
-            "\"type\":\"bytes\"," +
+            "\"type\":\"fixed\"," +
+            "\"name\":\"" + typeInfo.getFullFieldName() + "\"," +
+            "\"size\":" + bytes + "," +
             "\"logicalType\":\"decimal\"," +
             "\"precision\":" + precision + "," +
             "\"scale\":" + scale + "}");
@@ -189,4 +192,16 @@ public class AvroSchemaUtils {
     Schema.Parser parser = new Schema.Parser();
     return parser.parse(str);
   }
+
+  // org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
+  // Map precision to the number bytes needed for binary conversion.
+  public static final int[] PRECISION_TO_BYTE_COUNT = new int[38];
+
+  static {
+    for (int prec = 1; prec <= 38; prec++) {
+      // Estimated number of bytes needed.
+      PRECISION_TO_BYTE_COUNT[prec - 1] = (int)
+          Math.ceil((Math.log(Math.pow(10, prec) - 1) / Math.log(2) + 1) / 8);
+    }
+  }
 }
diff --git 
a/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroWriter.java
 
b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroWriter.java
index d60ef6745..34fa16667 100644
--- 
a/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroWriter.java
+++ 
b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroWriter.java
@@ -40,7 +40,6 @@ import org.apache.orc.bench.core.CompressionKind;
 import org.apache.orc.bench.core.convert.BatchWriter;
 
 import java.io.IOException;
-import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -166,8 +165,12 @@ public class AvroWriter implements BatchWriter {
   }
 
   private static class DecimalConverter implements AvroConverter {
+    final Schema avroSchema;
+    final int precision;
     final int scale;
-    DecimalConverter(int scale) {
+    DecimalConverter(Schema avroSchema, int precision, int scale) {
+      this.avroSchema = avroSchema;
+      this.precision = precision;
       this.scale = scale;
     }
     public Object convert(ColumnVector cv, int row) {
@@ -176,8 +179,7 @@ public class AvroWriter implements BatchWriter {
       }
       if (cv.noNulls || !cv.isNull[row]) {
         DecimalColumnVector vector = (DecimalColumnVector) cv;
-        return getBufferFromDecimal(
-            vector.vector[row].getHiveDecimal(), scale);
+        return decimalToBinary(vector.vector[row].getHiveDecimal(), 
avroSchema, precision, scale);
       } else {
         return null;
       }
@@ -270,7 +272,7 @@ public class AvroWriter implements BatchWriter {
       case TIMESTAMP:
         return new TimestampConverter();
       case DECIMAL:
-        return new DecimalConverter(types.getScale());
+        return new DecimalConverter(avroSchema, types.getPrecision(), 
types.getScale());
       case LIST:
         return new ListConverter(types, avroSchema);
       case STRUCT:
@@ -356,11 +358,28 @@ public class AvroWriter implements BatchWriter {
     writer.close();
   }
 
-  static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) {
-    if (dec == null) {
-      return null;
+  // 
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter.decimalToBinary()
+  private static GenericData.Fixed decimalToBinary(HiveDecimal hiveDecimal,
+                                                   Schema avroSchema, int 
prec, int scale) {
+    byte[] decimalBytes = 
hiveDecimal.setScale(scale).unscaledValue().toByteArray();
+
+    // Estimated number of bytes needed.
+    int precToBytes = AvroSchemaUtils.PRECISION_TO_BYTE_COUNT[prec - 1];
+    if (precToBytes == decimalBytes.length) {
+      // No padding needed.
+      return new GenericData.Fixed(avroSchema, decimalBytes);
+    }
+
+    byte[] tgt = new byte[precToBytes];
+    if (hiveDecimal.signum() == -1) {
+      // For negative number, initializing bits to 1
+      for (int i = 0; i < precToBytes; i++) {
+        tgt[i] |= 0xFF;
+      }
     }
 
-    return ByteBuffer.wrap(dec.bigIntegerBytesScaled(scale));
+    System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length,
+        decimalBytes.length); // Padding leading zeroes/ones.
+    return new GenericData.Fixed(avroSchema, tgt);
   }
 }
diff --git a/java/bench/core/src/resources/taxi.schema 
b/java/bench/core/src/resources/taxi.schema
index 720848faa..adb1f54f8 100644
--- a/java/bench/core/src/resources/taxi.schema
+++ b/java/bench/core/src/resources/taxi.schema
@@ -9,13 +9,13 @@ struct<
   PULocationID: bigint,
   DOLocationID: bigint,
   payment_type: bigint,
-  fare_amount: decimal(8,2),
-  extra: decimal(8,2),
-  mta_tax: decimal(8,2),
-  tip_amount: decimal(8,2),
-  tolls_amount: decimal(8,2),
-  improvement_surcharge: decimal(8,2),
-  total_amount: decimal(8,2),
+  fare_amount: decimal(10,2),
+  extra: decimal(10,2),
+  mta_tax: decimal(10,2),
+  tip_amount: decimal(10,2),
+  tolls_amount: decimal(10,2),
+  improvement_surcharge: decimal(10,2),
+  total_amount: decimal(10,2),
   congestion_surcharge: int,
   airport_fee: int
 >

Reply via email to