Repository: orc Updated Branches: refs/heads/orc-72 [created] 7315a0145
more updates Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/73cdb4c2 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/73cdb4c2 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/73cdb4c2 Branch: refs/heads/orc-72 Commit: 73cdb4c2de03e512d5be28daaf5d1f0f292535b7 Parents: 1752e17 Author: Owen O'Malley <omal...@apache.org> Authored: Mon Oct 10 09:30:20 2016 -0700 Committer: Owen O'Malley <omal...@apache.org> Committed: Mon Oct 10 13:59:16 2016 -0700 ---------------------------------------------------------------------- .../orc/bench/parquet/DataWritableWriter.java | 550 +++++++++++++++++++ .../apache/orc/bench/parquet/RowInBatch.java | 33 ++ 2 files changed, 583 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/73cdb4c2/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java new file mode 100644 index 0000000..220e452 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java @@ -0,0 +1,550 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.parquet; + +import org.apache.orc.TypeDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; + +import java.sql.Date; +import java.sql.Timestamp; +import java.util.Map; + +/** + * + * DataWritableWriter sends a record to the Parquet API with the expected schema in order + * to be written to a file. + * This class is only used through DataWritableWriteSupport class. + */ +public class DataWritableWriter { + private static final Logger LOG = LoggerFactory.getLogger(DataWritableWriter.class); + protected final RecordConsumer recordConsumer; + private final GroupType schema; + private final TypeDescription hiveType; + + /* This writer will be created when writing the first row in order to get + information about how to inspect the record data. */ + private final DataWriter messageWriter; + + public DataWritableWriter(final RecordConsumer recordConsumer, + final GroupType schema, + TypeDescription hiveType) { + this.recordConsumer = recordConsumer; + this.schema = schema; + this.hiveType = hiveType; + messageWriter = createMessageWriter(hiveType, schema); + } + + /** + * It writes a record to Parquet. + * @param record Contains the record that is going to be written. + */ + public void write(final RowInBatch record) { + messageWriter.write(record); + } + + private MessageDataWriter createMessageWriter(TypeDescription hiveType, + GroupType schema) { + return new MessageDataWriter(hiveType, schema); + } + + /** + * Creates a writer for the specific object inspector. The returned writer will be used + * to call Parquet API for the specific data type. + * @param hiveType The type description used to get the correct value type. + * @param type Type that contains information about the type schema. + * @return A ParquetWriter object used to call the Parquet API fo the specific data type. + */ + private DataWriter createWriter(TypeDescription hiveType, Type type) { + switch (hiveType.getCategory()) { + case BOOLEAN: + return new BooleanDataWriter(hiveType); + case BYTE: + return new ByteDataWriter(hiveType); + case SHORT: + return new ShortDataWriter(hiveType); + case INT: + return new IntDataWriter(hiveType); + case LONG: + return new LongDataWriter(hiveType); + case FLOAT: + return new FloatDataWriter(hiveType); + case DOUBLE: + return new DoubleDataWriter(hiveType); + case STRING: + return new StringDataWriter(hiveType); + case CHAR: + return new CharDataWriter(hiveType); + case VARCHAR: + return new VarcharDataWriter(hiveType); + case BINARY: + return new BinaryDataWriter(hiveType); + case TIMESTAMP: + return new TimestampDataWriter(hiveType); + case DECIMAL: + return new DecimalDataWriter(hiveType); + case DATE: + return new DateDataWriter(hiveType); + case LIST: + case MAP: + case STRUCT: + default: + throw new IllegalArgumentException("Unhandled type " + hiveType); + } + if (type.isPrimitive()) { + PrimitiveObjectInspector primitiveInspector = (PrimitiveObjectInspector)inspector; + switch (primitiveInspector.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + case CHAR: + case VARCHAR: + case BINARY: + case TIMESTAMP: + case DECIMAL: + case DATE: + default: + throw new IllegalArgumentException("Unsupported primitive data type: " + primitiveInspector.getPrimitiveCategory()); + } + } else { + GroupType groupType = type.asGroupType(); + OriginalType originalType = type.getOriginalType(); + + if (originalType != null && originalType.equals(OriginalType.LIST)) { + checkInspectorCategory(inspector, ObjectInspector.Category.LIST); + return new ListDataWriter((ListObjectInspector)inspector, groupType); + } else if (originalType != null && originalType.equals(OriginalType.MAP)) { + checkInspectorCategory(inspector, ObjectInspector.Category.MAP); + return new MapDataWriter((MapObjectInspector)inspector, groupType); + } else { + checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT); + return new StructDataWriter((StructObjectInspector)inspector, groupType); + } + } + } + + /** + * Checks that an inspector matches the category indicated as a parameter. + * @param inspector The object inspector to check + * @param category The category to match + * @throws IllegalArgumentException if inspector does not match the category + */ + private void checkInspectorCategory(ObjectInspector inspector, ObjectInspector.Category category) { + if (!inspector.getCategory().equals(category)) { + throw new IllegalArgumentException("Invalid data type: expected " + category + + " type, but found: " + inspector.getCategory()); + } + } + + abstract class DataWriter { + protected final int id; + + DataWriter(TypeDescription type) { + id = type.getId(); + } + + abstract void write(RowInBatch value); + } + + private class GroupDataWriter implements DataWriter { + private StructObjectInspector inspector; + private List<? extends StructField> structFields; + private DataWriter[] structWriters; + + public GroupDataWriter(StructObjectInspector inspector, GroupType groupType) { + this.inspector = inspector; + + structFields = this.inspector.getAllStructFieldRefs(); + structWriters = new DataWriter[structFields.size()]; + + for (int i = 0; i < structFields.size(); i++) { + StructField field = structFields.get(i); + structWriters[i] = createWriter(field.getFieldObjectInspector(), groupType.getType(i)); + } + } + + @Override + public void write(Object value) { + for (int i = 0; i < structFields.size(); i++) { + StructField field = structFields.get(i); + Object fieldValue = inspector.getStructFieldData(value, field); + + if (fieldValue != null) { + String fieldName = field.getFieldName(); + DataWriter writer = structWriters[i]; + + recordConsumer.startField(fieldName, i); + writer.write(fieldValue); + recordConsumer.endField(fieldName, i); + } + } + } + } + + private class MessageDataWriter extends GroupDataWriter implements DataWriter { + public MessageDataWriter(StructObjectInspector inspector, GroupType groupType) { + super(inspector, groupType); + } + + @Override + public void write(Object value) { + recordConsumer.startMessage(); + if (value != null) { + super.write(value); + } + recordConsumer.endMessage(); + } + } + + private class StructDataWriter extends GroupDataWriter implements DataWriter { + public StructDataWriter(StructObjectInspector inspector, GroupType groupType) { + super(inspector, groupType); + } + + @Override + public void write(Object value) { + recordConsumer.startGroup(); + super.write(value); + recordConsumer.endGroup(); + } + } + + private class ListDataWriter implements DataWriter { + private ListObjectInspector inspector; + private String elementName; + private DataWriter elementWriter; + private String repeatedGroupName; + + public ListDataWriter(ListObjectInspector inspector, GroupType groupType) { + this.inspector = inspector; + + // Get the internal array structure + GroupType repeatedType = groupType.getType(0).asGroupType(); + this.repeatedGroupName = repeatedType.getName(); + + Type elementType = repeatedType.getType(0); + this.elementName = elementType.getName(); + + ObjectInspector elementInspector = this.inspector.getListElementObjectInspector(); + this.elementWriter = createWriter(elementInspector, elementType); + } + + @Override + public void write(Object value) { + recordConsumer.startGroup(); + int listLength = inspector.getListLength(value); + + if (listLength > 0) { + recordConsumer.startField(repeatedGroupName, 0); + + for (int i = 0; i < listLength; i++) { + Object element = inspector.getListElement(value, i); + recordConsumer.startGroup(); + if (element != null) { + recordConsumer.startField(elementName, 0); + elementWriter.write(element); + recordConsumer.endField(elementName, 0); + } + recordConsumer.endGroup(); + } + + recordConsumer.endField(repeatedGroupName, 0); + } + recordConsumer.endGroup(); + } + } + + private class MapDataWriter implements DataWriter { + private MapObjectInspector inspector; + private String repeatedGroupName; + private String keyName, valueName; + private DataWriter keyWriter, valueWriter; + + public MapDataWriter(MapObjectInspector inspector, GroupType groupType) { + this.inspector = inspector; + + // Get the internal map structure (MAP_KEY_VALUE) + GroupType repeatedType = groupType.getType(0).asGroupType(); + this.repeatedGroupName = repeatedType.getName(); + + // Get key element information + Type keyType = repeatedType.getType(0); + ObjectInspector keyInspector = this.inspector.getMapKeyObjectInspector(); + this.keyName = keyType.getName(); + this.keyWriter = createWriter(keyInspector, keyType); + + // Get value element information + Type valuetype = repeatedType.getType(1); + ObjectInspector valueInspector = this.inspector.getMapValueObjectInspector(); + this.valueName = valuetype.getName(); + this.valueWriter = createWriter(valueInspector, valuetype); + } + + @Override + public void write(Object value) { + recordConsumer.startGroup(); + + Map<?, ?> mapValues = inspector.getMap(value); + if (mapValues != null && mapValues.size() > 0) { + recordConsumer.startField(repeatedGroupName, 0); + for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) { + recordConsumer.startGroup(); + if (keyValue != null) { + // write key element + Object keyElement = keyValue.getKey(); + recordConsumer.startField(keyName, 0); + keyWriter.write(keyElement); + recordConsumer.endField(keyName, 0); + + // write value element + Object valueElement = keyValue.getValue(); + if (valueElement != null) { + recordConsumer.startField(valueName, 1); + valueWriter.write(valueElement); + recordConsumer.endField(valueName, 1); + } + } + recordConsumer.endGroup(); + } + + recordConsumer.endField(repeatedGroupName, 0); + } + recordConsumer.endGroup(); + } + } + + private class BooleanDataWriter implements DataWriter { + private BooleanObjectInspector inspector; + + public BooleanDataWriter(BooleanObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addBoolean(inspector.get(value)); + } + } + + private class ByteDataWriter implements DataWriter { + private ByteObjectInspector inspector; + + public ByteDataWriter(ByteObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addInteger(inspector.get(value)); + } + } + + private class ShortDataWriter implements DataWriter { + private ShortObjectInspector inspector; + public ShortDataWriter(ShortObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addInteger(inspector.get(value)); + } + } + + private class IntDataWriter implements DataWriter { + + public IntDataWriter(TypeDescription inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addInteger(inspector.get(value)); + } + } + + private class LongDataWriter implements DataWriter { + private LongObjectInspector inspector; + + public LongDataWriter(LongObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addLong(inspector.get(value)); + } + } + + private class FloatDataWriter implements DataWriter { + private FloatObjectInspector inspector; + + public FloatDataWriter(FloatObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addFloat(inspector.get(value)); + } + } + + private class DoubleDataWriter implements DataWriter { + private DoubleObjectInspector inspector; + + public DoubleDataWriter(DoubleObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addDouble(inspector.get(value)); + } + } + + private class StringDataWriter implements DataWriter { + private StringObjectInspector inspector; + + public StringDataWriter(StringObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + String v = inspector.getPrimitiveJavaObject(value); + recordConsumer.addBinary(Binary.fromString(v)); + } + } + + private class CharDataWriter implements DataWriter { + private HiveCharObjectInspector inspector; + + public CharDataWriter(HiveCharObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + String v = inspector.getPrimitiveJavaObject(value).getStrippedValue(); + recordConsumer.addBinary(Binary.fromString(v)); + } + } + + private class VarcharDataWriter implements DataWriter { + private HiveVarcharObjectInspector inspector; + + public VarcharDataWriter(HiveVarcharObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + String v = inspector.getPrimitiveJavaObject(value).getValue(); + recordConsumer.addBinary(Binary.fromString(v)); + } + } + + private class BinaryDataWriter implements DataWriter { + private BinaryObjectInspector inspector; + + public BinaryDataWriter(BinaryObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + byte[] vBinary = inspector.getPrimitiveJavaObject(value); + recordConsumer.addBinary(Binary.fromByteArray(vBinary)); + } + } + + private class TimestampDataWriter implements DataWriter { + private TimestampObjectInspector inspector; + + public TimestampDataWriter(TimestampObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + Timestamp ts = inspector.getPrimitiveJavaObject(value); + recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary()); + } + } + + private class DecimalDataWriter implements DataWriter { + private HiveDecimalObjectInspector inspector; + + public DecimalDataWriter(HiveDecimalObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + HiveDecimal vDecimal = inspector.getPrimitiveJavaObject(value); + DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo(); + recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo)); + } + + private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) { + int prec = decimalTypeInfo.precision(); + int scale = decimalTypeInfo.scale(); + byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray(); + + // Estimated number of bytes needed. + int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1]; + if (precToBytes == decimalBytes.length) { + // No padding needed. + return Binary.fromByteArray(decimalBytes); + } + + byte[] tgt = new byte[precToBytes]; + if (hiveDecimal.signum() == -1) { + // For negative number, initializing bits to 1 + for (int i = 0; i < precToBytes; i++) { + tgt[i] |= 0xFF; + } + } + + System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones. + return Binary.fromByteArray(tgt); + } + } + + private class DateDataWriter implements DataWriter { + private DateObjectInspector inspector; + + public DateDataWriter(DateObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + Date vDate = inspector.getPrimitiveJavaObject(value); + recordConsumer.addInteger(DateWritable.dateToDays(vDate)); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/73cdb4c2/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java b/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java new file mode 100644 index 0000000..60b4dfd --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java @@ -0,0 +1,33 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.parquet; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +/** + * A value class that wraps a VectorizedRowBatch and a row index. + */ +public class RowInBatch { + public final VectorizedRowBatch batch; + public final TypeDescription schema; + public int row; + + RowInBatch(TypeDescription schema) { + this.schema = schema; + batch = schema.createRowBatch(); + row = 0; + } +}