This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 45a44f4 ORC: Refactor readers to remove duplicate null handling code
(#899)
45a44f4 is described below
commit 45a44f4cf49300db9a89e205d172146591b0bad1
Author: Shardul Mahadik <[email protected]>
AuthorDate: Thu Apr 9 14:31:37 2020 -0700
ORC: Refactor readers to remove duplicate null handling code (#899)
---
.../apache/iceberg/data/orc/GenericOrcReader.java | 247 ++++----------
.../apache/iceberg/spark/data/SparkOrcReader.java | 376 ++++-----------------
2 files changed, 138 insertions(+), 485 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
index 88c407d..03f03be 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
@@ -96,219 +96,140 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
}
interface Converter<T> {
- T convert(ColumnVector vector, int row);
- }
-
- private static class BooleanConverter implements Converter<Boolean> {
- @Override
- public Boolean convert(ColumnVector vector, int row) {
+ default T convert(ColumnVector vector, int row) {
int rowIndex = vector.isRepeating ? 0 : row;
if (!vector.noNulls && vector.isNull[rowIndex]) {
return null;
} else {
- return ((LongColumnVector) vector).vector[rowIndex] != 0;
+ return convertNonNullValue(vector, row);
}
}
+
+ T convertNonNullValue(ColumnVector vector, int row);
+ }
+
+ private static class BooleanConverter implements Converter<Boolean> {
+ @Override
+ public Boolean convertNonNullValue(ColumnVector vector, int row) {
+ return ((LongColumnVector) vector).vector[row] != 0;
+ }
}
private static class ByteConverter implements Converter<Byte> {
@Override
- public Byte convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return (byte) ((LongColumnVector) vector).vector[rowIndex];
- }
+ public Byte convertNonNullValue(ColumnVector vector, int row) {
+ return (byte) ((LongColumnVector) vector).vector[row];
}
}
private static class ShortConverter implements Converter<Short> {
@Override
- public Short convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return (short) ((LongColumnVector) vector).vector[rowIndex];
- }
+ public Short convertNonNullValue(ColumnVector vector, int row) {
+ return (short) ((LongColumnVector) vector).vector[row];
}
}
private static class IntConverter implements Converter<Integer> {
@Override
- public Integer convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return (int) ((LongColumnVector) vector).vector[rowIndex];
- }
+ public Integer convertNonNullValue(ColumnVector vector, int row) {
+ return (int) ((LongColumnVector) vector).vector[row];
}
}
private static class TimeConverter implements Converter<LocalTime> {
@Override
- public LocalTime convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return LocalTime.ofNanoOfDay(((LongColumnVector)
vector).vector[rowIndex] * 1_000);
- }
+ public LocalTime convertNonNullValue(ColumnVector vector, int row) {
+ return LocalTime.ofNanoOfDay(((LongColumnVector) vector).vector[row] *
1_000);
}
}
private static class DateConverter implements Converter<LocalDate> {
@Override
- public LocalDate convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return EPOCH_DAY.plusDays((int) ((LongColumnVector)
vector).vector[rowIndex]);
- }
+ public LocalDate convertNonNullValue(ColumnVector vector, int row) {
+ return EPOCH_DAY.plusDays((int) ((LongColumnVector) vector).vector[row]);
}
}
private static class LongConverter implements Converter<Long> {
@Override
- public Long convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return ((LongColumnVector) vector).vector[rowIndex];
- }
+ public Long convertNonNullValue(ColumnVector vector, int row) {
+ return ((LongColumnVector) vector).vector[row];
}
}
private static class FloatConverter implements Converter<Float> {
@Override
- public Float convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return (float) ((DoubleColumnVector) vector).vector[rowIndex];
- }
+ public Float convertNonNullValue(ColumnVector vector, int row) {
+ return (float) ((DoubleColumnVector) vector).vector[row];
}
}
private static class DoubleConverter implements Converter<Double> {
@Override
- public Double convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return ((DoubleColumnVector) vector).vector[rowIndex];
- }
+ public Double convertNonNullValue(ColumnVector vector, int row) {
+ return ((DoubleColumnVector) vector).vector[row];
}
}
private static class TimestampTzConverter implements
Converter<OffsetDateTime> {
- private OffsetDateTime convert(TimestampColumnVector vector, int row) {
- return Instant.ofEpochSecond(Math.floorDiv(vector.time[row], 1_000),
vector.nanos[row]).atOffset(ZoneOffset.UTC);
- }
-
@Override
- public OffsetDateTime convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return convert((TimestampColumnVector) vector, rowIndex);
- }
+ public OffsetDateTime convertNonNullValue(ColumnVector vector, int row) {
+ TimestampColumnVector tcv = (TimestampColumnVector) vector;
+ return Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000),
tcv.nanos[row]).atOffset(ZoneOffset.UTC);
}
}
private static class TimestampConverter implements Converter<LocalDateTime> {
-
- private LocalDateTime convert(TimestampColumnVector vector, int row) {
- return Instant.ofEpochSecond(Math.floorDiv(vector.time[row], 1_000),
vector.nanos[row]).atOffset(ZoneOffset.UTC)
- .toLocalDateTime();
- }
-
@Override
- public LocalDateTime convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return convert((TimestampColumnVector) vector, rowIndex);
- }
+ public LocalDateTime convertNonNullValue(ColumnVector vector, int row) {
+ TimestampColumnVector tcv = (TimestampColumnVector) vector;
+ return Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000),
tcv.nanos[row]).atOffset(ZoneOffset.UTC)
+ .toLocalDateTime();
}
}
private static class FixedConverter implements Converter<byte[]> {
@Override
- public byte[] convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- BytesColumnVector bytesVector = (BytesColumnVector) vector;
- return Arrays.copyOfRange(bytesVector.vector[rowIndex],
bytesVector.start[rowIndex],
- bytesVector.start[rowIndex] + bytesVector.length[rowIndex]);
- }
+ public byte[] convertNonNullValue(ColumnVector vector, int row) {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ return Arrays.copyOfRange(bytesVector.vector[row],
bytesVector.start[row],
+ bytesVector.start[row] + bytesVector.length[row]);
}
}
private static class BinaryConverter implements Converter<ByteBuffer> {
@Override
- public ByteBuffer convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- BytesColumnVector bytesVector = (BytesColumnVector) vector;
- return ByteBuffer.wrap(bytesVector.vector[rowIndex],
bytesVector.start[rowIndex], bytesVector.length[rowIndex]);
- }
+ public ByteBuffer convertNonNullValue(ColumnVector vector, int row) {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ return ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row],
bytesVector.length[row]);
}
}
private static class UUIDConverter implements Converter<UUID> {
@Override
- public UUID convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- BytesColumnVector bytesVector = (BytesColumnVector) vector;
- ByteBuffer buf = ByteBuffer.wrap(bytesVector.vector[rowIndex],
bytesVector.start[rowIndex],
- bytesVector.length[rowIndex]);
- long mostSigBits = buf.getLong();
- long leastSigBits = buf.getLong();
- return new UUID(mostSigBits, leastSigBits);
- }
+ public UUID convertNonNullValue(ColumnVector vector, int row) {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ ByteBuffer buf = ByteBuffer.wrap(bytesVector.vector[row],
bytesVector.start[row], bytesVector.length[row]);
+ long mostSigBits = buf.getLong();
+ long leastSigBits = buf.getLong();
+ return new UUID(mostSigBits, leastSigBits);
}
}
private static class StringConverter implements Converter<String> {
@Override
- public String convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- BytesColumnVector bytesVector = (BytesColumnVector) vector;
- return new String(bytesVector.vector[rowIndex],
bytesVector.start[rowIndex], bytesVector.length[rowIndex],
- StandardCharsets.UTF_8);
- }
+ public String convertNonNullValue(ColumnVector vector, int row) {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ return new String(bytesVector.vector[row], bytesVector.start[row],
bytesVector.length[row],
+ StandardCharsets.UTF_8);
}
}
private static class DecimalConverter implements Converter<BigDecimal> {
@Override
- public BigDecimal convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- DecimalColumnVector cv = (DecimalColumnVector) vector;
- return
cv.vector[rowIndex].getHiveDecimal().bigDecimalValue().setScale(cv.scale);
- }
+ public BigDecimal convertNonNullValue(ColumnVector vector, int row) {
+ DecimalColumnVector cv = (DecimalColumnVector) vector;
+ return
cv.vector[row].getHiveDecimal().bigDecimalValue().setScale(cv.scale);
}
}
@@ -326,29 +247,21 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
.get(0), child);
}
- List<?> readList(ListColumnVector vector, int row) {
- int offset = (int) vector.offsets[row];
- int length = (int) vector.lengths[row];
+ @Override
+ public List<?> convertNonNullValue(ColumnVector vector, int row) {
+ ListColumnVector listVector = (ListColumnVector) vector;
+ int offset = (int) listVector.offsets[row];
+ int length = (int) listVector.lengths[row];
List<Object> list = Lists.newArrayListWithExpectedSize(length);
for (int c = 0; c < length; ++c) {
- list.add(childConverter.convert(vector.child, offset + c));
+ list.add(childConverter.convert(listVector.child, offset + c));
}
return list;
}
-
- @Override
- public List<?> convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return readList((ListColumnVector) vector, rowIndex);
- }
- }
}
- private static class MapConverter implements Converter {
+ private static class MapConverter implements Converter<Map<?, ?>> {
private final Converter keyConvert;
private final Converter valueConvert;
@@ -362,29 +275,21 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
valueConvert = buildConverter(mapFields.get(1), valueType);
}
- Map<?, ?> readMap(MapColumnVector vector, int row) {
- final int offset = (int) vector.offsets[row];
- final int length = (int) vector.lengths[row];
+ @Override
+ public Map<?, ?> convertNonNullValue(ColumnVector vector, int row) {
+ MapColumnVector mapVector = (MapColumnVector) vector;
+ final int offset = (int) mapVector.offsets[row];
+ final int length = (int) mapVector.lengths[row];
Map<Object, Object> map = Maps.newHashMapWithExpectedSize(length);
for (int c = 0; c < length; ++c) {
- Object key = keyConvert.convert(vector.keys, offset + c);
- Object value = valueConvert.convert(vector.values, offset + c);
+ Object key = keyConvert.convert(mapVector.keys, offset + c);
+ Object value = valueConvert.convert(mapVector.values, offset + c);
map.put(key, value);
}
return map;
}
-
- @Override
- public Map convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return readMap((MapColumnVector) vector, rowIndex);
- }
- }
}
private static class StructConverter implements Converter<Record> {
@@ -404,23 +309,15 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
}
}
- Record writeStruct(StructColumnVector vector, int row) {
+ @Override
+ public Record convertNonNullValue(ColumnVector vector, int row) {
+ StructColumnVector structVector = (StructColumnVector) vector;
Record data = GenericRecord.create(icebergStructSchema);
for (int c = 0; c < children.length; ++c) {
- data.set(c, children[c].convert(vector.fields[c], row));
+ data.set(c, children[c].convert(structVector.fields[c], row));
}
return data;
}
-
- @Override
- public Record convert(ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- return null;
- } else {
- return writeStruct((StructColumnVector) vector, rowIndex);
- }
- }
}
private static Converter buildConverter(final Types.NestedField
icebergField, final TypeDescription schema) {
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
index b965c6d..829ca45 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -212,230 +212,90 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
* methods.
*/
interface Converter {
- void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int
row);
- void convert(UnsafeArrayWriter writer, int element, ColumnVector vector,
int row);
- }
-
- private static class BooleanConverter implements Converter {
- @Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
+ default void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
int rowIndex = vector.isRepeating ? 0 : row;
if (!vector.noNulls && vector.isNull[rowIndex]) {
writer.setNullAt(column);
} else {
- writer.write(column, ((LongColumnVector) vector).vector[rowIndex] !=
0);
+ convertNonNullValue(writer, column, vector, rowIndex);
}
}
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
+ default void convert(UnsafeArrayWriter writer, int element, ColumnVector
vector, int row) {
int rowIndex = vector.isRepeating ? 0 : row;
if (!vector.noNulls && vector.isNull[rowIndex]) {
writer.setNull(element);
} else {
- writer.write(element, ((LongColumnVector) vector).vector[rowIndex] !=
0);
+ convertNonNullValue(writer, element, vector, rowIndex);
}
}
+
+ void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector
vector, int row);
}
- private static class ByteConverter implements Converter {
+ private static class BooleanConverter implements Converter {
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- writer.write(column, (byte) ((LongColumnVector)
vector).vector[rowIndex]);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ writer.write(ordinal, ((LongColumnVector) vector).vector[row] != 0);
}
+ }
+ private static class ByteConverter implements Converter {
@Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- writer.write(element, (byte) ((LongColumnVector)
vector).vector[rowIndex]);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ writer.write(ordinal, (byte) ((LongColumnVector) vector).vector[row]);
}
}
private static class ShortConverter implements Converter {
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- writer.write(column, (short) ((LongColumnVector)
vector).vector[rowIndex]);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- writer.write(element, (short) ((LongColumnVector)
vector).vector[rowIndex]);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ writer.write(ordinal, (short) ((LongColumnVector) vector).vector[row]);
}
}
private static class IntConverter implements Converter {
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- writer.write(column, (int) ((LongColumnVector)
vector).vector[rowIndex]);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- writer.write(element, (int) ((LongColumnVector)
vector).vector[rowIndex]);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ writer.write(ordinal, (int) ((LongColumnVector) vector).vector[row]);
}
}
private static class LongConverter implements Converter {
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- writer.write(column, ((LongColumnVector) vector).vector[rowIndex]);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- writer.write(element, ((LongColumnVector) vector).vector[rowIndex]);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ writer.write(ordinal, ((LongColumnVector) vector).vector[row]);
}
}
private static class FloatConverter implements Converter {
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- writer.write(column, (float) ((DoubleColumnVector)
vector).vector[rowIndex]);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- writer.write(element, (float) ((DoubleColumnVector)
vector).vector[rowIndex]);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ writer.write(ordinal, (float) ((DoubleColumnVector) vector).vector[row]);
}
}
private static class DoubleConverter implements Converter {
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- writer.write(column, ((DoubleColumnVector) vector).vector[rowIndex]);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- writer.write(element, ((DoubleColumnVector) vector).vector[rowIndex]);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ writer.write(ordinal, ((DoubleColumnVector) vector).vector[row]);
}
}
private static class TimestampTzConverter implements Converter {
-
- private long convert(TimestampColumnVector vector, int row) {
- // compute microseconds past 1970.
- return (vector.time[row] / 1000) * 1_000_000 + vector.nanos[row] / 1000;
- }
-
- @Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- writer.write(column, convert((TimestampColumnVector) vector,
rowIndex));
- }
- }
-
@Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- writer.write(element, convert((TimestampColumnVector) vector,
rowIndex));
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ TimestampColumnVector timestampVector = (TimestampColumnVector) vector;
+ // compute microseconds past 1970.
+ writer.write(ordinal, (timestampVector.time[row] / 1000) * 1_000_000 +
timestampVector.nanos[row] / 1000);
}
}
private static class BinaryConverter implements Converter {
-
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- BytesColumnVector bytesVector = (BytesColumnVector) vector;
- writer.write(column, bytesVector.vector[rowIndex],
- bytesVector.start[rowIndex], bytesVector.length[rowIndex]);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element, ColumnVector
vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- final BytesColumnVector v = (BytesColumnVector) vector;
- writer.write(element, v.vector[rowIndex], v.start[rowIndex],
v.length[rowIndex]);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ writer.write(ordinal, bytesVector.vector[row], bytesVector.start[row],
bytesVector.length[row]);
}
}
@@ -449,31 +309,11 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
}
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- HiveDecimalWritable value = ((DecimalColumnVector)
vector).vector[rowIndex];
- writer.write(column,
- new Decimal().set(value.serialize64(value.scale()),
value.precision(), value.scale()),
- precision, scale);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- HiveDecimalWritable value = ((DecimalColumnVector)
vector).vector[rowIndex];
- writer.write(element,
- new Decimal().set(value.serialize64(value.scale()),
value.precision(), value.scale()),
- precision, scale);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row];
+ writer.write(ordinal,
+ new Decimal().set(value.serialize64(value.scale()),
value.precision(), value.scale()),
+ precision, scale);
}
}
@@ -487,33 +327,12 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
}
@Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- BigDecimal value = ((DecimalColumnVector) vector).vector[rowIndex]
- .getHiveDecimal().bigDecimalValue();
- writer.write(column,
- new Decimal().set(new scala.math.BigDecimal(value), precision,
scale),
- precision, scale);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- BigDecimal value = ((DecimalColumnVector) vector).vector[rowIndex]
- .getHiveDecimal().bigDecimalValue();
- writer.write(element,
- new Decimal().set(new scala.math.BigDecimal(value), precision,
scale),
- precision, scale);
- }
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ BigDecimal value = ((DecimalColumnVector) vector).vector[row]
+ .getHiveDecimal().bigDecimalValue();
+ writer.write(ordinal,
+ new Decimal().set(new scala.math.BigDecimal(value), precision,
scale),
+ precision, scale);
}
}
@@ -527,37 +346,16 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
}
}
- int writeStruct(UnsafeWriter parentWriter, StructColumnVector vector, int
row) {
- UnsafeRowWriter childWriter = new UnsafeRowWriter(parentWriter,
children.length);
+ @Override
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ StructColumnVector structVector = (StructColumnVector) vector;
+ UnsafeRowWriter childWriter = new UnsafeRowWriter(writer,
children.length);
int start = childWriter.cursor();
childWriter.resetRowWriter();
for (int c = 0; c < children.length; ++c) {
- children[c].convert(childWriter, c, vector.fields[c], row);
- }
- return start;
- }
-
- @Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- int start = writeStruct(writer, (StructColumnVector) vector, rowIndex);
- writer.setOffsetAndSizeFromPreviousCursor(column, start);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- int start = writeStruct(writer, (StructColumnVector) vector, rowIndex);
- writer.setOffsetAndSizeFromPreviousCursor(element, start);
+ children[c].convert(childWriter, c, structVector.fields[c], row);
}
+ writer.setOffsetAndSizeFromPreviousCursor(ordinal, start);
}
}
@@ -570,41 +368,19 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
childConverter = buildConverter(child);
}
- int writeList(UnsafeWriter parentWriter, ListColumnVector vector, int row)
{
- int offset = (int) vector.offsets[row];
- int length = (int) vector.lengths[row];
+ @Override
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ ListColumnVector listVector = (ListColumnVector) vector;
+ int offset = (int) listVector.offsets[row];
+ int length = (int) listVector.lengths[row];
- UnsafeArrayWriter childWriter = new UnsafeArrayWriter(parentWriter,
getArrayElementSize(child));
+ UnsafeArrayWriter childWriter = new UnsafeArrayWriter(writer,
getArrayElementSize(child));
int start = childWriter.cursor();
childWriter.initialize(length);
for (int c = 0; c < length; ++c) {
- childConverter.convert(childWriter, c, vector.child, offset + c);
- }
- return start;
- }
-
- @Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
- int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- int start = writeList(writer, (ListColumnVector) vector, rowIndex);
- writer.setOffsetAndSizeFromPreviousCursor(column, start);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element,
- ColumnVector vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- int start = writeList(writer, (ListColumnVector) vector, rowIndex);
- writer.setOffsetAndSizeFromPreviousCursor(element, start);
+ childConverter.convert(childWriter, c, listVector.child, offset + c);
}
+ writer.setOffsetAndSizeFromPreviousCursor(ordinal, start);
}
}
@@ -627,11 +403,13 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
valueSize = getArrayElementSize(valueType);
}
- int writeMap(UnsafeWriter parentWriter, MapColumnVector vector, int row) {
- final int offset = (int) vector.offsets[row];
- final int length = (int) vector.lengths[row];
+ @Override
+ public void convertNonNullValue(UnsafeWriter writer, int ordinal,
ColumnVector vector, int row) {
+ MapColumnVector mapVector = (MapColumnVector) vector;
+ final int offset = (int) mapVector.offsets[row];
+ final int length = (int) mapVector.lengths[row];
- UnsafeArrayWriter keyWriter = new UnsafeArrayWriter(parentWriter,
keySize);
+ UnsafeArrayWriter keyWriter = new UnsafeArrayWriter(writer, keySize);
final int start = keyWriter.cursor();
// save room for the key size
keyWriter.grow(KEY_SIZE_BYTES);
@@ -640,41 +418,19 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
// serialize the keys
keyWriter.initialize(length);
for (int c = 0; c < length; ++c) {
- keyConvert.convert(keyWriter, c, vector.keys, offset + c);
+ keyConvert.convert(keyWriter, c, mapVector.keys, offset + c);
}
// store the serialized size of the keys
Platform.putLong(keyWriter.getBuffer(), start,
- keyWriter.cursor() - start - KEY_SIZE_BYTES);
+ keyWriter.cursor() - start - KEY_SIZE_BYTES);
// serialize the values
- UnsafeArrayWriter valueWriter = new UnsafeArrayWriter(parentWriter,
valueSize);
+ UnsafeArrayWriter valueWriter = new UnsafeArrayWriter(writer, valueSize);
valueWriter.initialize(length);
for (int c = 0; c < length; ++c) {
- valueConvert.convert(valueWriter, c, vector.values, offset + c);
- }
- return start;
- }
-
- @Override
- public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNullAt(column);
- } else {
- int start = writeMap(writer, (MapColumnVector) vector, rowIndex);
- writer.setOffsetAndSizeFromPreviousCursor(column, start);
- }
- }
-
- @Override
- public void convert(UnsafeArrayWriter writer, int element, ColumnVector
vector, int row) {
- int rowIndex = vector.isRepeating ? 0 : row;
- if (!vector.noNulls && vector.isNull[rowIndex]) {
- writer.setNull(element);
- } else {
- int start = writeMap(writer, (MapColumnVector) vector, rowIndex);
- writer.setOffsetAndSizeFromPreviousCursor(element, start);
+ valueConvert.convert(valueWriter, c, mapVector.values, offset + c);
}
+ writer.setOffsetAndSizeFromPreviousCursor(ordinal, start);
}
}