This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1bebf1a Parquet: Fix vectorized reads for negative decimals (#1736)
1bebf1a is described below
commit 1bebf1a42539be523412474651c1b31c4ef3ae69
Author: Samarth Jain <[email protected]>
AuthorDate: Fri Nov 6 13:21:30 2020 -0800
Parquet: Fix vectorized reads for negative decimals (#1736)
---
.../java/org/apache/iceberg/util/RandomUtil.java | 24 ++--
.../arrow/vectorized/VectorizedArrowReader.java | 14 ++-
.../parquet/VectorizedColumnIterator.java | 22 +++-
...orizedDictionaryEncodedParquetValuesReader.java | 51 +++++++--
.../vectorized/parquet/VectorizedPageIterator.java | 39 ++++++-
.../VectorizedParquetDefinitionLevelReader.java | 125 +++++++++++++++++----
6 files changed, 218 insertions(+), 57 deletions(-)
diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
index a9254cf..f01db61 100644
--- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
+++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
@@ -31,6 +31,10 @@ public class RandomUtil {
private RandomUtil() {
}
+ private static boolean negate(int num) {
+ return num % 2 == 1;
+ }
+
@SuppressWarnings("RandomModInteger")
public static Object generatePrimitive(Type.PrimitiveType primitive,
Random random) {
@@ -49,7 +53,7 @@ public class RandomUtil {
case 3:
return 0;
default:
- return random.nextInt();
+ return negate(choice) ? -1 * random.nextInt() : random.nextInt();
}
case LONG:
@@ -61,7 +65,7 @@ public class RandomUtil {
case 3:
return 0L;
default:
- return random.nextLong();
+ return negate(choice) ? -1L * random.nextLong() :
random.nextLong();
}
case FLOAT:
@@ -83,7 +87,7 @@ public class RandomUtil {
case 8:
return Float.NaN;
default:
- return random.nextFloat();
+ return negate(choice) ? -1.0F * random.nextFloat() :
random.nextFloat();
}
case DOUBLE:
@@ -105,7 +109,7 @@ public class RandomUtil {
case 8:
return Double.NaN;
default:
- return random.nextDouble();
+ return negate(choice) ? -1.0D * random.nextDouble() :
random.nextDouble();
}
case DATE:
@@ -140,7 +144,8 @@ public class RandomUtil {
case DECIMAL:
Types.DecimalType type = (Types.DecimalType) primitive;
BigInteger unscaled = randomUnscaled(type.precision(), random);
- return new BigDecimal(unscaled, type.scale());
+ BigDecimal bigDecimal = new BigDecimal(unscaled, type.scale());
+ return negate(choice) ? bigDecimal.negate() : bigDecimal;
default:
throw new IllegalArgumentException(
@@ -155,11 +160,11 @@ public class RandomUtil {
return true; // doesn't really matter for booleans since they are not
dictionary encoded
case INTEGER:
case DATE:
- return value;
+ return negate(value) ? -1 * value : value;
case FLOAT:
- return (float) value;
+ return negate(value) ? -1.0F * (float) value : (float) value;
case DOUBLE:
- return (double) value;
+ return negate(value) ? -1.0D * (double) value : (double) value;
case LONG:
case TIME:
case TIMESTAMP:
@@ -177,7 +182,8 @@ public class RandomUtil {
case DECIMAL:
Types.DecimalType type = (Types.DecimalType) primitive;
BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
- return new BigDecimal(unscaled, type.scale());
+ BigDecimal bd = new BigDecimal(unscaled, type.scale());
+ return negate(value) ? bd.negate() : bd;
default:
throw new IllegalArgumentException(
"Cannot generate random value for unknown type: " + primitive);
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index bb571e0..4eb8091 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -92,7 +92,8 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
private enum ReadType {
FIXED_LENGTH_DECIMAL,
- INT_LONG_BACKED_DECIMAL,
+ INT_BACKED_DECIMAL,
+ LONG_BACKED_DECIMAL,
VARCHAR,
VARBINARY,
FIXED_WIDTH_BINARY,
@@ -130,8 +131,11 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
case FIXED_LENGTH_DECIMAL:
vectorizedColumnIterator.nextBatchFixedLengthDecimal(vec,
typeWidth, nullabilityHolder);
break;
- case INT_LONG_BACKED_DECIMAL:
- vectorizedColumnIterator.nextBatchIntLongBackedDecimal(vec,
typeWidth, nullabilityHolder);
+ case INT_BACKED_DECIMAL:
+ vectorizedColumnIterator.nextBatchIntBackedDecimal(vec,
nullabilityHolder);
+ break;
+ case LONG_BACKED_DECIMAL:
+ vectorizedColumnIterator.nextBatchLongBackedDecimal(vec,
nullabilityHolder);
break;
case VARBINARY:
vectorizedColumnIterator.nextBatchVarWidthType(vec,
nullabilityHolder);
@@ -237,11 +241,11 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
this.typeWidth = primitive.getTypeLength();
break;
case INT64:
- this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
+ this.readType = ReadType.LONG_BACKED_DECIMAL;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case INT32:
- this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
+ this.readType = ReadType.INT_BACKED_DECIMAL;
this.typeWidth = (int) IntVector.TYPE_WIDTH;
break;
default:
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
index cb9d278..d963c45 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
@@ -129,16 +129,30 @@ public class VectorizedColumnIterator extends
BaseColumnIterator {
}
}
- public void nextBatchIntLongBackedDecimal(
+ public void nextBatchIntBackedDecimal(
FieldVector fieldVector,
- int typeWidth,
NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
- vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector,
batchSize - rowsReadSoFar,
- rowsReadSoFar, typeWidth, nullabilityHolder);
+ vectorizedPageIterator.nextBatchIntBackedDecimal(fieldVector,
batchSize - rowsReadSoFar,
+ rowsReadSoFar, nullabilityHolder);
+ rowsReadSoFar += rowsInThisBatch;
+ this.triplesRead += rowsInThisBatch;
+ fieldVector.setValueCount(rowsReadSoFar);
+ }
+ }
+
+ public void nextBatchLongBackedDecimal(
+ FieldVector fieldVector,
+ NullabilityHolder nullabilityHolder) {
+ int rowsReadSoFar = 0;
+ while (rowsReadSoFar < batchSize && hasNext()) {
+ advance();
+ int rowsInThisBatch =
+ vectorizedPageIterator.nextBatchLongBackedDecimal(fieldVector,
batchSize - rowsReadSoFar,
+ rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 52e389e..74d9e15 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -286,8 +286,8 @@ public class VectorizedDictionaryEncodedParquetValuesReader
extends BaseVectoriz
case RLE:
for (int i = 0; i < num; i++) {
byte[] decimalBytes =
dict.decodeToBinary(currentValue).getBytesUnsafe();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes,
DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ byte[] vectorBytes = new byte[typeWidth];
+ System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
@@ -296,8 +296,8 @@ public class VectorizedDictionaryEncodedParquetValuesReader
extends BaseVectoriz
case PACKED:
for (int i = 0; i < num; i++) {
byte[] decimalBytes =
dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes,
DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ byte[] vectorBytes = new byte[typeWidth];
+ System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
@@ -343,7 +343,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader
extends BaseVectoriz
}
}
- void readBatchOfDictionaryEncodedIntLongBackedDecimals(FieldVector vector,
int typeWidth, int startOffset,
+ void readBatchOfDictionaryEncodedIntBackedDecimals(FieldVector vector, int
startOffset,
int numValuesToRead,
Dictionary dict,
NullabilityHolder
nullabilityHolder) {
int left = numValuesToRead;
@@ -358,7 +358,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader
extends BaseVectoriz
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx,
- typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) :
dict.decodeToLong(currentValue));
+ dict.decodeToInt(currentValue));
nullabilityHolder.setNotNull(idx);
idx++;
}
@@ -366,10 +366,41 @@ public class
VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
case PACKED:
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
- idx,
- typeWidth == Integer.BYTES ?
-
dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++])
- :
dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx,
dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
+ nullabilityHolder.setNotNull(idx);
+ idx++;
+ }
+ break;
+ }
+ left -= num;
+ currentCount -= num;
+ }
+ }
+
+ void readBatchOfDictionaryEncodedLongBackedDecimals(FieldVector vector, int
startOffset,
+ int numValuesToRead,
Dictionary dict,
+ NullabilityHolder
nullabilityHolder) {
+ int left = numValuesToRead;
+ int idx = startOffset;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int num = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < num; i++) {
+ ((DecimalVector) vector).set(
+ idx,
+ dict.decodeToLong(currentValue));
+ nullabilityHolder.setNotNull(idx);
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < num; i++) {
+ ((DecimalVector) vector).set(
+ idx,
dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
nullabilityHolder.setNotNull(idx);
idx++;
}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 2aa6f2c..9876962 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -274,28 +274,26 @@ public class VectorizedPageIterator extends
BasePageIterator {
* Method for reading a batch of decimals backed by INT32 and INT64 parquet
data types. Since Arrow stores all
* decimals in 16 bytes, byte arrays are appropriately padded before being
written to Arrow data buffers.
*/
- public int nextBatchIntLongBackedDecimal(
+ public int nextBatchIntBackedDecimal(
final FieldVector vector, final int expectedBatchSize, final int
numValsInVector,
- final int typeWidth, NullabilityHolder nullabilityHolder) {
+ NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader
- .readBatchOfDictionaryEncodedIntLongBackedDecimals(
+ .readBatchOfDictionaryEncodedIntBackedDecimals(
vector,
numValsInVector,
- typeWidth,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
- vectorizedDefinitionLevelReader.readBatchOfIntLongBackedDecimals(
+ vectorizedDefinitionLevelReader.readBatchOfIntBackedDecimals(
vector,
numValsInVector,
- typeWidth,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
@@ -305,6 +303,35 @@ public class VectorizedPageIterator extends
BasePageIterator {
return actualBatchSize;
}
+ public int nextBatchLongBackedDecimal(
+ final FieldVector vector, final int expectedBatchSize, final int
numValsInVector,
+ NullabilityHolder nullabilityHolder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
+ }
+ if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
+ vectorizedDefinitionLevelReader
+ .readBatchOfDictionaryEncodedLongBackedDecimals(
+ vector,
+ numValsInVector,
+ actualBatchSize,
+ nullabilityHolder,
+ dictionaryEncodedValuesReader,
+ dictionary);
+ } else {
+ vectorizedDefinitionLevelReader.readBatchOfLongBackedDecimals(
+ vector,
+ numValsInVector,
+ actualBatchSize,
+ nullabilityHolder,
+ plainValuesReader);
+ }
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
/**
* Method for reading a batch of decimals backed by fixed length byte array
parquet data type. Arrow stores all
* decimals in 16 bytes. This method provides the necessary padding to the
decimals read. Moreover, Arrow interprets
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index 7c21119..d330f09 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -623,12 +623,12 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ byte[] byteArray = new byte[typeWidth];
switch (mode) {
case RLE:
if (currentValue == maxDefLevel) {
for (int i = 0; i < num; i++) {
- valuesReader.getBuffer(typeWidth).get(byteArray,
DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
nullabilityHolder.setNotNull(bufferIdx);
bufferIdx++;
@@ -641,7 +641,7 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
case PACKED:
for (int i = 0; i < num; ++i) {
if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- valuesReader.getBuffer(typeWidth).get(byteArray,
DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
nullabilityHolder.setNotNull(bufferIdx);
} else {
@@ -685,8 +685,8 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
for (int i = 0; i < num; i++) {
if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
ByteBuffer decimalBytes =
dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).toByteBuffer();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes,
DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ byte[] vectorBytes = new byte[typeWidth];
+ System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
} else {
@@ -805,10 +805,48 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
}
}
- public void readBatchOfIntLongBackedDecimals(
+ public void readBatchOfIntBackedDecimals(
final FieldVector vector, final int startOffset,
- final int typeWidth, final int numValsToRead, NullabilityHolder
nullabilityHolder,
- ValuesAsBytesReader valuesReader) {
+ final int numValsToRead, NullabilityHolder nullabilityHolder,
ValuesAsBytesReader valuesReader) {
+ int bufferIdx = startOffset;
+ int left = numValsToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int num = Math.min(left, this.currentCount);
+ byte[] byteArray = new byte[Integer.BYTES];
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < num; i++) {
+ setIntBackedDecimal(vector, nullabilityHolder, valuesReader,
bufferIdx, byteArray);
+ bufferIdx++;
+ }
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, num,
vector.getValidityBuffer());
+ bufferIdx += num;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < num; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setIntBackedDecimal(vector, nullabilityHolder, valuesReader,
bufferIdx, byteArray);
+ } else {
+ setNull(nullabilityHolder, bufferIdx,
vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= num;
+ currentCount -= num;
+ }
+ }
+
+ public void readBatchOfLongBackedDecimals(
+ final FieldVector vector, final int startOffset,
+ final int numValsToRead, NullabilityHolder nullabilityHolder,
ValuesAsBytesReader valuesReader) {
int bufferIdx = startOffset;
int left = numValsToRead;
while (left > 0) {
@@ -816,12 +854,12 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ byte[] byteArray = new byte[Long.BYTES];
switch (mode) {
case RLE:
if (currentValue == maxDefLevel) {
for (int i = 0; i < num; i++) {
- setIntLongBackedDecimal(vector, typeWidth, nullabilityHolder,
valuesReader, bufferIdx, byteArray);
+ setLongBackedDecimal(vector, nullabilityHolder, valuesReader,
bufferIdx, byteArray);
bufferIdx++;
}
} else {
@@ -832,7 +870,7 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
case PACKED:
for (int i = 0; i < num; ++i) {
if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- setIntLongBackedDecimal(vector, typeWidth, nullabilityHolder,
valuesReader, bufferIdx, byteArray);
+ setLongBackedDecimal(vector, nullabilityHolder, valuesReader,
bufferIdx, byteArray);
} else {
setNull(nullabilityHolder, bufferIdx,
vector.getValidityBuffer());
}
@@ -845,20 +883,21 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
}
}
- private void setIntLongBackedDecimal(FieldVector vector, int typeWidth,
NullabilityHolder nullabilityHolder,
+ private void setIntBackedDecimal(FieldVector vector, NullabilityHolder
nullabilityHolder,
ValuesAsBytesReader valuesReader, int
bufferIdx, byte[] byteArray) {
- valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
- vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH,
byteArray);
+ ((DecimalVector) vector).set(bufferIdx,
valuesReader.getBuffer(Integer.BYTES).getInt());
nullabilityHolder.setNotNull(bufferIdx);
- if (setArrowValidityVector) {
- BitVectorHelper.setBit(vector.getValidityBuffer(), bufferIdx);
- }
}
- public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
+ private void setLongBackedDecimal(FieldVector vector, NullabilityHolder
nullabilityHolder,
+ ValuesAsBytesReader valuesReader, int
bufferIdx, byte[] byteArray) {
+ ((DecimalVector) vector).set(bufferIdx,
valuesReader.getBuffer(Long.BYTES).getLong());
+ nullabilityHolder.setNotNull(bufferIdx);
+ }
+
+ public void readBatchOfDictionaryEncodedIntBackedDecimals(
final FieldVector vector,
final int startOffset,
- final int typeWidth,
final int numValsToRead,
NullabilityHolder nullabilityHolder,
VectorizedDictionaryEncodedParquetValuesReader
dictionaryEncodedValuesReader,
@@ -873,7 +912,7 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
switch (mode) {
case RLE:
if (currentValue == maxDefLevel) {
-
dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimals(vector,
typeWidth, idx,
+
dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedIntBackedDecimals(vector,
idx,
num, dict, nullabilityHolder);
} else {
setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
@@ -885,9 +924,49 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
((DecimalVector) vector).set(
idx,
- typeWidth == Integer.BYTES ?
-
dict.decodeToInt(dictionaryEncodedValuesReader.readInteger())
- :
dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()));
+
dict.decodeToInt(dictionaryEncodedValuesReader.readInteger()));
+ nullabilityHolder.setNotNull(idx);
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= num;
+ currentCount -= num;
+ }
+ }
+
+ public void readBatchOfDictionaryEncodedLongBackedDecimals(
+ final FieldVector vector,
+ final int startOffset,
+ final int numValsToRead,
+ NullabilityHolder nullabilityHolder,
+ VectorizedDictionaryEncodedParquetValuesReader
dictionaryEncodedValuesReader,
+ Dictionary dict) {
+ int idx = startOffset;
+ int left = numValsToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int num = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+
dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedLongBackedDecimals(vector,
idx,
+ num, dict, nullabilityHolder);
+ } else {
+ setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
+ }
+ idx += num;
+ break;
+ case PACKED:
+ for (int i = 0; i < num; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ ((DecimalVector) vector).set(
+ idx,
dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()));
nullabilityHolder.setNotNull(idx);
} else {
setNull(nullabilityHolder, idx, vector.getValidityBuffer());