This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c8b97c91a Arrow: Fix for dictionary encoded fixed length binary
decimals (#5198)
c8b97c91a is described below
commit c8b97c91ac04a2ee5ee8f746dcc4619a9c8d5ffe
Author: Bryan Keller <[email protected]>
AuthorDate: Tue Jul 5 16:36:23 2022 -0700
Arrow: Fix for dictionary encoded fixed length binary decimals (#5198)
---
.../GenericArrowVectorAccessorFactory.java | 35 +++++++-------------
.../vectorized/parquet/DecimalVectorUtil.java | 10 +++++-
...orizedDictionaryEncodedParquetValuesReader.java | 7 ++--
.../VectorizedParquetDefinitionLevelReader.java | 10 ++----
...dDictionaryEncodedFlatParquetDataBenchmark.java | 20 +++++-------
.../VectorizedReadFlatParquetDataBenchmark.java | 38 ++++++++++++++++++----
.../apache/iceberg/spark/data/AvroDataTest.java | 7 ++--
.../vectorized/TestParquetVectorizedReads.java | 8 +++--
8 files changed, 75 insertions(+), 60 deletions(-)
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
index f0a1ecdf2..85d2fb7b5 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.arrow.vectorized;
import java.lang.reflect.Array;
import java.math.BigDecimal;
-import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.function.IntFunction;
import java.util.function.Supplier;
@@ -556,9 +555,9 @@ public class GenericArrowVectorAccessorFactory<DecimalT,
Utf8StringT, ArrayT, Ch
DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT
extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
private final DecimalT[] cache;
- private final DecimalFactory<DecimalT> decimalFactory;
- private final Dictionary parquetDictionary;
private final IntVector offsetVector;
+ protected final DecimalFactory<DecimalT> decimalFactory;
+ protected final Dictionary parquetDictionary;
private DictionaryDecimalAccessor(
IntVector vector,
@@ -571,28 +570,16 @@ public class GenericArrowVectorAccessorFactory<DecimalT,
Utf8StringT, ArrayT, Ch
this.cache = genericArray(decimalFactory.getGenericClass(),
dictionary.getMaxId() + 1);
}
- protected long decodeToBinary(int dictId) {
- return new
BigInteger(parquetDictionary.decodeToBinary(dictId).getBytesUnsafe()).longValue();
- }
-
- protected long decodeToLong(int dictId) {
- return parquetDictionary.decodeToLong(dictId);
- }
-
- protected int decodeToInt(int dictId) {
- return parquetDictionary.decodeToInt(dictId);
- }
-
@Override
public final DecimalT getDecimal(int rowId, int precision, int scale) {
int offset = offsetVector.get(rowId);
if (cache[offset] == null) {
- cache[offset] = decimalFactory.ofLong(decode(offset), precision,
scale);
+ cache[offset] = decode(offset, precision, scale);
}
return cache[offset];
}
- protected abstract long decode(int dictId);
+ protected abstract DecimalT decode(int dictId, int precision, int scale);
}
private static class
@@ -604,8 +591,10 @@ public class GenericArrowVectorAccessorFactory<DecimalT,
Utf8StringT, ArrayT, Ch
}
@Override
- protected long decode(int dictId) {
- return decodeToBinary(dictId);
+ protected DecimalT decode(int dictId, int precision, int scale) {
+ ByteBuffer byteBuffer =
parquetDictionary.decodeToBinary(dictId).toByteBuffer();
+ BigDecimal value =
DecimalUtility.getBigDecimalFromByteBuffer(byteBuffer, scale,
byteBuffer.remaining());
+ return decimalFactory.ofBigDecimal(value, precision, scale);
}
}
@@ -617,8 +606,8 @@ public class GenericArrowVectorAccessorFactory<DecimalT,
Utf8StringT, ArrayT, Ch
}
@Override
- protected long decode(int dictId) {
- return decodeToLong(dictId);
+ protected DecimalT decode(int dictId, int precision, int scale) {
+ return decimalFactory.ofLong(parquetDictionary.decodeToLong(dictId),
precision, scale);
}
}
@@ -630,8 +619,8 @@ public class GenericArrowVectorAccessorFactory<DecimalT,
Utf8StringT, ArrayT, Ch
}
@Override
- protected long decode(int dictId) {
- return decodeToInt(dictId);
+ protected DecimalT decode(int dictId, int precision, int scale) {
+ return decimalFactory.ofLong(parquetDictionary.decodeToInt(dictId),
precision, scale);
}
}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
index cb4e45369..5e6855a91 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
@@ -20,12 +20,19 @@
package org.apache.iceberg.arrow.vectorized.parquet;
import java.util.Arrays;
+import org.apache.arrow.vector.DecimalVector;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
public class DecimalVectorUtil {
private DecimalVectorUtil() {
}
+ public static void setBigEndian(DecimalVector vector, int idx, byte[] value)
{
+ byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(value,
DecimalVector.TYPE_WIDTH);
+ vector.setBigEndian(idx, paddedBytes);
+ }
+
/**
* Parquet stores decimal values in big-endian byte order, and Arrow stores
them in native byte order.
* When setting the value in Arrow, we call setBigEndian(), and the byte
order is reversed if needed.
@@ -37,7 +44,8 @@ public class DecimalVectorUtil {
* @param newLength The length of the byte array to return
* @return The new byte array
*/
- public static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength)
{
+ @VisibleForTesting
+ static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength) {
if (bigEndianBytes.length == newLength) {
return bigEndianBytes;
} else if (bigEndianBytes.length < newLength) {
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index e8e3cabd9..b930f35eb 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -128,11 +128,8 @@ public class
VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
class FixedLengthDecimalDictEncodedReader extends BaseDictEncodedReader {
@Override
protected void nextVal(FieldVector vector, Dictionary dict, int idx, int
currentVal, int typeWidth) {
- byte[] vectorBytes =
- DecimalVectorUtil.padBigEndianBytes(
- dict.decodeToBinary(currentVal).getBytesUnsafe(),
- DecimalVector.TYPE_WIDTH);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ byte[] bytes = dict.decodeToBinary(currentVal).getBytesUnsafe();
+ DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, bytes);
}
}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index a8990d0cd..46c6736ce 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -358,8 +358,7 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
protected void nextVal(
FieldVector vector, int idx, ValuesAsBytesReader valuesReader, int
typeWidth, byte[] byteArray) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
- byte[] vectorBytes = DecimalVectorUtil.padBigEndianBytes(byteArray,
DecimalVector.TYPE_WIDTH);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, byteArray);
}
@Override
@@ -370,11 +369,8 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
reader.fixedLengthDecimalDictEncodedReader()
.nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder,
typeWidth);
} else if (Mode.PACKED.equals(mode)) {
- byte[] vectorBytes =
- DecimalVectorUtil.padBigEndianBytes(
- dict.decodeToBinary(reader.readInteger()).getBytesUnsafe(),
- DecimalVector.TYPE_WIDTH);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ byte[] bytes =
dict.decodeToBinary(reader.readInteger()).getBytesUnsafe();
+ DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, bytes);
}
}
}
diff --git
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index 0bb0a1192..5a0393225 100644
---
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -19,12 +19,9 @@
package org.apache.iceberg.spark.source.parquet.vectorized;
-import java.math.BigDecimal;
-import java.math.BigInteger;
import java.util.Map;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -45,8 +42,8 @@ import static org.apache.spark.sql.functions.to_timestamp;
* <p>
* To run this benchmark for spark-3.3:
* <code>
- * ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh
- *
-PjmhIncludeRegex=VectorizedReadDictionaryEncodedFlatParquetDataBenchmark
+ * ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh \
+ *
-PjmhIncludeRegex=VectorizedReadDictionaryEncodedFlatParquetDataBenchmark \
* -PjmhOutputPath=benchmark/results.txt
* </code>
*/
@@ -78,6 +75,7 @@ public class
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
df = withIntColumnDictEncoded(df);
df = withFloatColumnDictEncoded(df);
df = withDoubleColumnDictEncoded(df);
+ df = withBigDecimalColumnNotDictEncoded(df); // no dictionary for fixed
len binary in Parquet v1
df = withDecimalColumnDictEncoded(df);
df = withDateColumnDictEncoded(df);
df = withTimestampColumnDictEncoded(df);
@@ -113,9 +111,12 @@ public class
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
return df.withColumn("doubleCol", modColumn().cast(DataTypes.DoubleType));
}
+ private static Dataset<Row> withBigDecimalColumnNotDictEncoded(Dataset<Row>
df) {
+ return df.withColumn("bigDecimalCol", modColumn().cast("decimal(20,5)"));
+ }
+
private static Dataset<Row> withDecimalColumnDictEncoded(Dataset<Row> df) {
- Types.DecimalType type = Types.DecimalType.of(20, 5);
- return df.withColumn("decimalCol", lit(bigDecimal(type,
0)).plus(modColumn()));
+ return df.withColumn("decimalCol", modColumn().cast("decimal(18,5)"));
}
private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
@@ -131,9 +132,4 @@ public class
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
private static Dataset<Row> withStringColumnDictEncoded(Dataset<Row> df) {
return df.withColumn("stringCol", modColumn().cast(DataTypes.StringType));
}
-
- private static BigDecimal bigDecimal(Types.DecimalType type, int value) {
- BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
- return new BigDecimal(unscaled, type.scale());
- }
}
diff --git
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
index 3e2a566ea..a1a36fd56 100644
---
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
+++
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
@@ -53,8 +53,8 @@ import static org.apache.spark.sql.functions.when;
* <p>
* To run this benchmark for spark-3.3:
* <code>
- * ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh
- * -PjmhIncludeRegex=VectorizedReadFlatParquetDataBenchmark
+ * ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh \
+ * -PjmhIncludeRegex=VectorizedReadFlatParquetDataBenchmark \
* -PjmhOutputPath=benchmark/results.txt
* </code>
*/
@@ -87,15 +87,18 @@ public class VectorizedReadFlatParquetDataBenchmark extends
IcebergSourceBenchma
@Override
protected Table initTable() {
+ // bigDecimalCol is big enough to be encoded as fix len binary (9 bytes),
+ // decimalCol is small enough to be encoded as a 64-bit int
Schema schema = new Schema(
optional(1, "longCol", Types.LongType.get()),
optional(2, "intCol", Types.IntegerType.get()),
optional(3, "floatCol", Types.FloatType.get()),
optional(4, "doubleCol", Types.DoubleType.get()),
- optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
- optional(6, "dateCol", Types.DateType.get()),
- optional(7, "timestampCol", Types.TimestampType.withZone()),
- optional(8, "stringCol", Types.StringType.get()));
+ optional(5, "bigDecimalCol", Types.DecimalType.of(20, 5)),
+ optional(6, "decimalCol", Types.DecimalType.of(18, 5)),
+ optional(7, "dateCol", Types.DateType.get()),
+ optional(8, "timestampCol", Types.TimestampType.withZone()),
+ optional(9, "stringCol", Types.StringType.get()));
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
HadoopTables tables = new HadoopTables(hadoopConf());
Map<String, String> properties = parquetWriteProps();
@@ -120,7 +123,8 @@ public class VectorizedReadFlatParquetDataBenchmark extends
IcebergSourceBenchma
.withColumn("intCol", expr("CAST(longCol AS INT)"))
.withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
.withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
- .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+ .withColumn("bigDecimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+ .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(18, 5))"))
.withColumn("dateCol", date_add(current_date(), fileNum))
.withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
.withColumn("stringCol", expr("CAST(longCol AS STRING)"));
@@ -228,6 +232,26 @@ public class VectorizedReadFlatParquetDataBenchmark
extends IcebergSourceBenchma
});
}
+ @Benchmark
+ @Threads(1)
+ public void readBigDecimalsIcebergVectorized5k() {
+ withTableProperties(tablePropsWithVectorizationEnabled(5000), () -> {
+ String tableLocation = table().location();
+ Dataset<Row> df = spark().read().format("iceberg")
+ .load(tableLocation).select("bigDecimalCol");
+ materialize(df);
+ });
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readBigDecimalsSparkVectorized5k() {
+ withSQLConf(sparkConfWithVectorizationEnabled(5000), () -> {
+ Dataset<Row> df =
spark().read().parquet(dataLocation()).select("bigDecimalCol");
+ materialize(df);
+ });
+ }
+
@Benchmark
@Threads(1)
public void readDatesIcebergVectorized5k() {
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
index 5f902f6e6..a8556de68 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
@@ -59,9 +59,10 @@ public abstract class AvroDataTest {
// required(111, "uuid", Types.UUIDType.get()),
required(112, "fixed", Types.FixedType.ofLength(7)),
optional(113, "bytes", Types.BinaryType.get()),
- required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
- required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
- required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's
maximum precision
+ required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded
+ required(115, "dec_11_2", Types.DecimalType.of(11, 2)), // long encoded
+ required(116, "dec_20_5", Types.DecimalType.of(20, 5)), // requires
padding
+ required(117, "dec_38_10", Types.DecimalType.of(38, 10)) // Spark's
maximum precision
);
@Rule
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index 19ffd023a..cff2e5cc6 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -279,10 +279,14 @@ public class TestParquetVectorizedReads extends
AvroDataTest {
@Test
public void testSupportedReadsForParquetV2() throws Exception {
- // Only float and double column types are written using plain encoding
with Parquet V2
+ // Float and double column types are written using plain encoding with
Parquet V2,
+ // also Parquet V2 will dictionary encode decimals that use fixed length
binary
+ // (i.e. decimals > 8 bytes)
Schema schema = new Schema(
optional(102, "float_data", Types.FloatType.get()),
- optional(103, "double_data", Types.DoubleType.get()));
+ optional(103, "double_data", Types.DoubleType.get()),
+ optional(104, "decimal_data", Types.DecimalType.of(25, 5))
+ );
File dataFile = temp.newFile();
Assert.assertTrue("Delete should succeed", dataFile.delete());