This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c8b97c91a Arrow: Fix for dictionary encoded fixed length binary 
decimals (#5198)
c8b97c91a is described below

commit c8b97c91ac04a2ee5ee8f746dcc4619a9c8d5ffe
Author: Bryan Keller <[email protected]>
AuthorDate: Tue Jul 5 16:36:23 2022 -0700

    Arrow: Fix for dictionary encoded fixed length binary decimals (#5198)
---
 .../GenericArrowVectorAccessorFactory.java         | 35 +++++++-------------
 .../vectorized/parquet/DecimalVectorUtil.java      | 10 +++++-
 ...orizedDictionaryEncodedParquetValuesReader.java |  7 ++--
 .../VectorizedParquetDefinitionLevelReader.java    | 10 ++----
 ...dDictionaryEncodedFlatParquetDataBenchmark.java | 20 +++++-------
 .../VectorizedReadFlatParquetDataBenchmark.java    | 38 ++++++++++++++++++----
 .../apache/iceberg/spark/data/AvroDataTest.java    |  7 ++--
 .../vectorized/TestParquetVectorizedReads.java     |  8 +++--
 8 files changed, 75 insertions(+), 60 deletions(-)

diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
index f0a1ecdf2..85d2fb7b5 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.arrow.vectorized;
 
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.function.IntFunction;
 import java.util.function.Supplier;
@@ -556,9 +555,9 @@ public class GenericArrowVectorAccessorFactory<DecimalT, 
Utf8StringT, ArrayT, Ch
       DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT 
extends AutoCloseable>
           extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, 
ChildVectorT> {
     private final DecimalT[] cache;
-    private final DecimalFactory<DecimalT> decimalFactory;
-    private final Dictionary parquetDictionary;
     private final IntVector offsetVector;
+    protected final DecimalFactory<DecimalT> decimalFactory;
+    protected final Dictionary parquetDictionary;
 
     private DictionaryDecimalAccessor(
             IntVector vector,
@@ -571,28 +570,16 @@ public class GenericArrowVectorAccessorFactory<DecimalT, 
Utf8StringT, ArrayT, Ch
       this.cache = genericArray(decimalFactory.getGenericClass(), 
dictionary.getMaxId() + 1);
     }
 
-    protected long decodeToBinary(int dictId) {
-      return new 
BigInteger(parquetDictionary.decodeToBinary(dictId).getBytesUnsafe()).longValue();
-    }
-
-    protected long decodeToLong(int dictId) {
-      return parquetDictionary.decodeToLong(dictId);
-    }
-
-    protected int decodeToInt(int dictId) {
-      return parquetDictionary.decodeToInt(dictId);
-    }
-
     @Override
     public final DecimalT getDecimal(int rowId, int precision, int scale) {
       int offset = offsetVector.get(rowId);
       if (cache[offset] == null) {
-        cache[offset] = decimalFactory.ofLong(decode(offset), precision, 
scale);
+        cache[offset] = decode(offset, precision, scale);
       }
       return cache[offset];
     }
 
-    protected abstract long decode(int dictId);
+    protected abstract DecimalT decode(int dictId, int precision, int scale);
   }
 
   private static class
@@ -604,8 +591,10 @@ public class GenericArrowVectorAccessorFactory<DecimalT, 
Utf8StringT, ArrayT, Ch
     }
 
     @Override
-    protected long decode(int dictId) {
-      return decodeToBinary(dictId);
+    protected DecimalT decode(int dictId, int precision, int scale) {
+      ByteBuffer byteBuffer = 
parquetDictionary.decodeToBinary(dictId).toByteBuffer();
+      BigDecimal value = 
DecimalUtility.getBigDecimalFromByteBuffer(byteBuffer, scale, 
byteBuffer.remaining());
+      return decimalFactory.ofBigDecimal(value, precision, scale);
     }
   }
 
@@ -617,8 +606,8 @@ public class GenericArrowVectorAccessorFactory<DecimalT, 
Utf8StringT, ArrayT, Ch
     }
 
     @Override
-    protected long decode(int dictId) {
-      return decodeToLong(dictId);
+    protected DecimalT decode(int dictId, int precision, int scale) {
+      return decimalFactory.ofLong(parquetDictionary.decodeToLong(dictId), 
precision, scale);
     }
   }
 
@@ -630,8 +619,8 @@ public class GenericArrowVectorAccessorFactory<DecimalT, 
Utf8StringT, ArrayT, Ch
     }
 
     @Override
-    protected long decode(int dictId) {
-      return decodeToInt(dictId);
+    protected DecimalT decode(int dictId, int precision, int scale) {
+      return decimalFactory.ofLong(parquetDictionary.decodeToInt(dictId), 
precision, scale);
     }
   }
 
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
index cb4e45369..5e6855a91 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
@@ -20,12 +20,19 @@
 package org.apache.iceberg.arrow.vectorized.parquet;
 
 import java.util.Arrays;
+import org.apache.arrow.vector.DecimalVector;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 
 public class DecimalVectorUtil {
 
   private DecimalVectorUtil() {
   }
 
+  public static void setBigEndian(DecimalVector vector, int idx, byte[] value) 
{
+    byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(value, 
DecimalVector.TYPE_WIDTH);
+    vector.setBigEndian(idx, paddedBytes);
+  }
+
   /**
    * Parquet stores decimal values in big-endian byte order, and Arrow stores 
them in native byte order.
    * When setting the value in Arrow, we call setBigEndian(), and the byte 
order is reversed if needed.
@@ -37,7 +44,8 @@ public class DecimalVectorUtil {
    * @param newLength      The length of the byte array to return
    * @return The new byte array
    */
-  public static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength) 
{
+  @VisibleForTesting
+  static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength) {
     if (bigEndianBytes.length == newLength) {
       return bigEndianBytes;
     } else if (bigEndianBytes.length < newLength) {
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index e8e3cabd9..b930f35eb 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -128,11 +128,8 @@ public class 
VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
   class FixedLengthDecimalDictEncodedReader extends BaseDictEncodedReader {
     @Override
     protected void nextVal(FieldVector vector, Dictionary dict, int idx, int 
currentVal, int typeWidth) {
-      byte[] vectorBytes =
-          DecimalVectorUtil.padBigEndianBytes(
-              dict.decodeToBinary(currentVal).getBytesUnsafe(),
-              DecimalVector.TYPE_WIDTH);
-      ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+      byte[] bytes = dict.decodeToBinary(currentVal).getBytesUnsafe();
+      DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, bytes);
     }
   }
 
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index a8990d0cd..46c6736ce 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -358,8 +358,7 @@ public final class VectorizedParquetDefinitionLevelReader 
extends BaseVectorized
     protected void nextVal(
         FieldVector vector, int idx, ValuesAsBytesReader valuesReader, int 
typeWidth, byte[] byteArray) {
       valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-      byte[] vectorBytes = DecimalVectorUtil.padBigEndianBytes(byteArray, 
DecimalVector.TYPE_WIDTH);
-      ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+      DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, byteArray);
     }
 
     @Override
@@ -370,11 +369,8 @@ public final class VectorizedParquetDefinitionLevelReader 
extends BaseVectorized
         reader.fixedLengthDecimalDictEncodedReader()
             .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, 
typeWidth);
       } else if (Mode.PACKED.equals(mode)) {
-        byte[] vectorBytes =
-            DecimalVectorUtil.padBigEndianBytes(
-                dict.decodeToBinary(reader.readInteger()).getBytesUnsafe(),
-                DecimalVector.TYPE_WIDTH);
-        ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+        byte[] bytes = 
dict.decodeToBinary(reader.readInteger()).getBytesUnsafe();
+        DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, bytes);
       }
     }
   }
diff --git 
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
 
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index 0bb0a1192..5a0393225 100644
--- 
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++ 
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -19,12 +19,9 @@
 
 package org.apache.iceberg.spark.source.parquet.vectorized;
 
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.Map;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -45,8 +42,8 @@ import static org.apache.spark.sql.functions.to_timestamp;
  * <p>
  * To run this benchmark for spark-3.3:
  * <code>
- *   ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh
- *       
-PjmhIncludeRegex=VectorizedReadDictionaryEncodedFlatParquetDataBenchmark
+ *   ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh \
+ *       
-PjmhIncludeRegex=VectorizedReadDictionaryEncodedFlatParquetDataBenchmark \
  *       -PjmhOutputPath=benchmark/results.txt
  * </code>
  */
@@ -78,6 +75,7 @@ public class 
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
     df = withIntColumnDictEncoded(df);
     df = withFloatColumnDictEncoded(df);
     df = withDoubleColumnDictEncoded(df);
+    df = withBigDecimalColumnNotDictEncoded(df); // no dictionary for fixed 
len binary in Parquet v1
     df = withDecimalColumnDictEncoded(df);
     df = withDateColumnDictEncoded(df);
     df = withTimestampColumnDictEncoded(df);
@@ -113,9 +111,12 @@ public class 
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
     return df.withColumn("doubleCol", modColumn().cast(DataTypes.DoubleType));
   }
 
+  private static Dataset<Row> withBigDecimalColumnNotDictEncoded(Dataset<Row> 
df) {
+    return df.withColumn("bigDecimalCol", modColumn().cast("decimal(20,5)"));
+  }
+
   private static Dataset<Row> withDecimalColumnDictEncoded(Dataset<Row> df) {
-    Types.DecimalType type = Types.DecimalType.of(20, 5);
-    return df.withColumn("decimalCol", lit(bigDecimal(type, 
0)).plus(modColumn()));
+    return df.withColumn("decimalCol", modColumn().cast("decimal(18,5)"));
   }
 
   private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
@@ -131,9 +132,4 @@ public class 
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
   private static Dataset<Row> withStringColumnDictEncoded(Dataset<Row> df) {
     return df.withColumn("stringCol", modColumn().cast(DataTypes.StringType));
   }
-
-  private static BigDecimal bigDecimal(Types.DecimalType type, int value) {
-    BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
-    return new BigDecimal(unscaled, type.scale());
-  }
 }
diff --git 
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
 
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
index 3e2a566ea..a1a36fd56 100644
--- 
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
+++ 
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
@@ -53,8 +53,8 @@ import static org.apache.spark.sql.functions.when;
  * <p>
  * To run this benchmark for spark-3.3:
  * <code>
- *   ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh
- *       -PjmhIncludeRegex=VectorizedReadFlatParquetDataBenchmark
+ *   ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh \
+ *       -PjmhIncludeRegex=VectorizedReadFlatParquetDataBenchmark \
  *       -PjmhOutputPath=benchmark/results.txt
  * </code>
  */
@@ -87,15 +87,18 @@ public class VectorizedReadFlatParquetDataBenchmark extends 
IcebergSourceBenchma
 
   @Override
   protected Table initTable() {
+    // bigDecimalCol is big enough to be encoded as fix len binary (9 bytes),
+    // decimalCol is small enough to be encoded as a 64-bit int
     Schema schema = new Schema(
         optional(1, "longCol", Types.LongType.get()),
         optional(2, "intCol", Types.IntegerType.get()),
         optional(3, "floatCol", Types.FloatType.get()),
         optional(4, "doubleCol", Types.DoubleType.get()),
-        optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
-        optional(6, "dateCol", Types.DateType.get()),
-        optional(7, "timestampCol", Types.TimestampType.withZone()),
-        optional(8, "stringCol", Types.StringType.get()));
+        optional(5, "bigDecimalCol", Types.DecimalType.of(20, 5)),
+        optional(6, "decimalCol", Types.DecimalType.of(18, 5)),
+        optional(7, "dateCol", Types.DateType.get()),
+        optional(8, "timestampCol", Types.TimestampType.withZone()),
+        optional(9, "stringCol", Types.StringType.get()));
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
     Map<String, String> properties = parquetWriteProps();
@@ -120,7 +123,8 @@ public class VectorizedReadFlatParquetDataBenchmark extends 
IcebergSourceBenchma
           .withColumn("intCol", expr("CAST(longCol AS INT)"))
           .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
           .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
-          .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+          .withColumn("bigDecimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+          .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(18, 5))"))
           .withColumn("dateCol", date_add(current_date(), fileNum))
           .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
           .withColumn("stringCol", expr("CAST(longCol AS STRING)"));
@@ -228,6 +232,26 @@ public class VectorizedReadFlatParquetDataBenchmark 
extends IcebergSourceBenchma
     });
   }
 
+  @Benchmark
+  @Threads(1)
+  public void readBigDecimalsIcebergVectorized5k() {
+    withTableProperties(tablePropsWithVectorizationEnabled(5000), () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+              .load(tableLocation).select("bigDecimalCol");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readBigDecimalsSparkVectorized5k() {
+    withSQLConf(sparkConfWithVectorizationEnabled(5000), () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).select("bigDecimalCol");
+      materialize(df);
+    });
+  }
+
   @Benchmark
   @Threads(1)
   public void readDatesIcebergVectorized5k() {
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
index 5f902f6e6..a8556de68 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
@@ -59,9 +59,10 @@ public abstract class AvroDataTest {
       // required(111, "uuid", Types.UUIDType.get()),
       required(112, "fixed", Types.FixedType.ofLength(7)),
       optional(113, "bytes", Types.BinaryType.get()),
-      required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
-      required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
-      required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's 
maximum precision
+      required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded
+      required(115, "dec_11_2", Types.DecimalType.of(11, 2)), // long encoded
+      required(116, "dec_20_5", Types.DecimalType.of(20, 5)), // requires 
padding
+      required(117, "dec_38_10", Types.DecimalType.of(38, 10)) // Spark's 
maximum precision
   );
 
   @Rule
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index 19ffd023a..cff2e5cc6 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -279,10 +279,14 @@ public class TestParquetVectorizedReads extends 
AvroDataTest {
 
   @Test
   public void testSupportedReadsForParquetV2() throws Exception {
-    // Only float and double column types are written using plain encoding 
with Parquet V2
+    // Float and double column types are written using plain encoding with 
Parquet V2,
+    // also Parquet V2 will dictionary encode decimals that use fixed length 
binary
+    // (i.e. decimals > 8 bytes)
     Schema schema = new Schema(
             optional(102, "float_data", Types.FloatType.get()),
-            optional(103, "double_data", Types.DoubleType.get()));
+            optional(103, "double_data", Types.DoubleType.get()),
+            optional(104, "decimal_data", Types.DecimalType.of(25, 5))
+    );
 
     File dataFile = temp.newFile();
     Assert.assertTrue("Delete should succeed", dataFile.delete());

Reply via email to