Author: brock
Date: Thu Feb 12 04:53:51 2015
New Revision: 1659147
URL: http://svn.apache.org/r1659147
Log:
HIVE-9333 - Move parquet serialize implementation to DataWritableWriter to
improve write speeds (Sergio via Brock)
Added:
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
Thu Feb 12 04:53:51 2015
@@ -21,7 +21,6 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -29,10 +28,10 @@ import org.apache.hadoop.hive.ql.io.IOCo
import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -41,27 +40,25 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.util.Progressable;
import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ContextUtil;
/**
*
* A Parquet OutputFormat for Hive (with the deprecated package mapred)
*
*/
-public class MapredParquetOutputFormat extends FileOutputFormat<Void,
ArrayWritable> implements
- HiveOutputFormat<Void, ArrayWritable> {
+public class MapredParquetOutputFormat extends FileOutputFormat<Void,
ParquetHiveRecord> implements
+ HiveOutputFormat<Void, ParquetHiveRecord> {
private static final Log LOG =
LogFactory.getLog(MapredParquetOutputFormat.class);
- protected ParquetOutputFormat<ArrayWritable> realOutputFormat;
+ protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat;
public MapredParquetOutputFormat() {
- realOutputFormat = new ParquetOutputFormat<ArrayWritable>(new
DataWritableWriteSupport());
+ realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new
DataWritableWriteSupport());
}
- public MapredParquetOutputFormat(final OutputFormat<Void, ArrayWritable>
mapreduceOutputFormat) {
- realOutputFormat = (ParquetOutputFormat<ArrayWritable>)
mapreduceOutputFormat;
+ public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord>
mapreduceOutputFormat) {
+ realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>)
mapreduceOutputFormat;
}
@Override
@@ -70,7 +67,7 @@ public class MapredParquetOutputFormat e
}
@Override
- public RecordWriter<Void, ArrayWritable> getRecordWriter(
+ public RecordWriter<Void, ParquetHiveRecord> getRecordWriter(
final FileSystem ignored,
final JobConf job,
final String name,
@@ -119,7 +116,7 @@ public class MapredParquetOutputFormat e
}
protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
- ParquetOutputFormat<ArrayWritable> realOutputFormat,
+ ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
JobConf jobConf,
String finalOutPath,
Progressable progress,
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
Thu Feb 12 04:53:51 2015
@@ -13,61 +13,31 @@
*/
package org.apache.hadoop.hive.ql.io.parquet.serde;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import parquet.hadoop.ParquetOutputFormat;
import parquet.hadoop.ParquetWriter;
-import parquet.io.api.Binary;
/**
*
@@ -110,6 +80,13 @@ public class ParquetHiveSerDe extends Ab
private long deserializedSize;
private String compressionType;
+ private ParquetHiveRecord parquetRow;
+
+ public ParquetHiveSerDe() {
+ parquetRow = new ParquetHiveRecord();
+ stats = new SerDeStats();
+ }
+
@Override
public final void initialize(final Configuration conf, final Properties tbl)
throws SerDeException {
@@ -144,7 +121,6 @@ public class ParquetHiveSerDe extends Ab
this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo)
rowTypeInfo);
// Stats part
- stats = new SerDeStats();
serializedSize = 0;
deserializedSize = 0;
status = LAST_OPERATION.UNKNOWN;
@@ -169,7 +145,7 @@ public class ParquetHiveSerDe extends Ab
@Override
public Class<? extends Writable> getSerializedClass() {
- return ArrayWritable.class;
+ return ParquetHiveRecord.class;
}
@Override
@@ -178,154 +154,11 @@ public class ParquetHiveSerDe extends Ab
if (!objInspector.getCategory().equals(Category.STRUCT)) {
throw new SerDeException("Cannot serialize " +
objInspector.getCategory() + ". Can only serialize a struct");
}
- final ArrayWritable serializeData = createStruct(obj,
(StructObjectInspector) objInspector);
- serializedSize = serializeData.get().length;
+ serializedSize =
((StructObjectInspector)objInspector).getAllStructFieldRefs().size();
status = LAST_OPERATION.SERIALIZE;
- return serializeData;
- }
-
- private ArrayWritable createStruct(final Object obj, final
StructObjectInspector inspector)
- throws SerDeException {
- final List<? extends StructField> fields =
inspector.getAllStructFieldRefs();
- final Writable[] arr = new Writable[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- final StructField field = fields.get(i);
- final Object subObj = inspector.getStructFieldData(obj, field);
- final ObjectInspector subInspector = field.getFieldObjectInspector();
- arr[i] = createObject(subObj, subInspector);
- }
- return new ArrayWritable(Writable.class, arr);
- }
-
- private Writable createMap(final Object obj, final MapObjectInspector
inspector)
- throws SerDeException {
- final Map<?, ?> sourceMap = inspector.getMap(obj);
- final ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
- final ObjectInspector valueInspector =
inspector.getMapValueObjectInspector();
- final List<ArrayWritable> array = new ArrayList<ArrayWritable>();
-
- if (sourceMap != null) {
- for (final Entry<?, ?> keyValue : sourceMap.entrySet()) {
- final Writable key = createObject(keyValue.getKey(), keyInspector);
- final Writable value = createObject(keyValue.getValue(),
valueInspector);
- if (key != null) {
- Writable[] arr = new Writable[2];
- arr[0] = key;
- arr[1] = value;
- array.add(new ArrayWritable(Writable.class, arr));
- }
- }
- }
- if (array.size() > 0) {
- final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class,
- array.toArray(new ArrayWritable[array.size()]));
- return new ArrayWritable(Writable.class, new Writable[] {subArray});
- } else {
- return null;
- }
- }
-
- private ArrayWritable createArray(final Object obj, final
ListObjectInspector inspector)
- throws SerDeException {
- final List<?> sourceArray = inspector.getList(obj);
- final ObjectInspector subInspector =
inspector.getListElementObjectInspector();
- final List<Writable> array = new ArrayList<Writable>();
- if (sourceArray != null) {
- for (final Object curObj : sourceArray) {
- array.add(createObject(curObj, subInspector));
- }
- }
- if (array.size() > 0) {
- final ArrayWritable subArray = new ArrayWritable(Writable.class,
- array.toArray(new Writable[array.size()]));
- return new ArrayWritable(Writable.class, new Writable[] {subArray});
- } else {
- return null;
- }
- }
-
- private Writable createPrimitive(final Object obj, final
PrimitiveObjectInspector inspector)
- throws SerDeException {
- if (obj == null) {
- return null;
- }
- switch (inspector.getPrimitiveCategory()) {
- case VOID:
- return null;
- case BOOLEAN:
- return new BooleanWritable(((BooleanObjectInspector) inspector).get(obj)
? Boolean.TRUE : Boolean.FALSE);
- case BYTE:
- return new ByteWritable(((ByteObjectInspector) inspector).get(obj));
- case DOUBLE:
- return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj));
- case FLOAT:
- return new FloatWritable(((FloatObjectInspector) inspector).get(obj));
- case INT:
- return new IntWritable(((IntObjectInspector) inspector).get(obj));
- case LONG:
- return new LongWritable(((LongObjectInspector) inspector).get(obj));
- case SHORT:
- return new ShortWritable(((ShortObjectInspector) inspector).get(obj));
- case STRING:
- String v = ((StringObjectInspector)
inspector).getPrimitiveJavaObject(obj);
- try {
- return new BytesWritable(v.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new SerDeException("Failed to encode string in UTF-8", e);
- }
- case DECIMAL:
- HiveDecimal hd = (HiveDecimal)inspector.getPrimitiveJavaObject(obj);
- DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) inspector.getTypeInfo();
- int prec = decTypeInfo.precision();
- int scale = decTypeInfo.scale();
- byte[] src = hd.setScale(scale).unscaledValue().toByteArray();
- // Estimated number of bytes needed.
- int bytes = PRECISION_TO_BYTE_COUNT[prec - 1];
- if (bytes == src.length) {
- // No padding needed.
- return new BytesWritable(src);
- }
- byte[] tgt = new byte[bytes];
- if ( hd.signum() == -1) {
- // For negative number, initializing bits to 1
- for (int i = 0; i < bytes; i++) {
- tgt[i] |= 0xFF;
- }
- }
- System.arraycopy(src, 0, tgt, bytes - src.length, src.length); //
Padding leading zeroes/ones.
- return new BytesWritable(tgt);
- case TIMESTAMP:
- return new TimestampWritable(((TimestampObjectInspector)
inspector).getPrimitiveJavaObject(obj));
- case CHAR:
- String strippedValue = ((HiveCharObjectInspector)
inspector).getPrimitiveJavaObject(obj).getStrippedValue();
- return new BytesWritable(Binary.fromString(strippedValue).getBytes());
- case VARCHAR:
- String value = ((HiveVarcharObjectInspector)
inspector).getPrimitiveJavaObject(obj).getValue();
- return new BytesWritable(Binary.fromString(value).getBytes());
- case BINARY:
- return new BytesWritable(((BinaryObjectInspector)
inspector).getPrimitiveJavaObject(obj));
- default:
- throw new SerDeException("Unknown primitive : " +
inspector.getPrimitiveCategory());
- }
- }
-
- private Writable createObject(final Object obj, final ObjectInspector
inspector) throws SerDeException {
- if (obj == null) {
- return null;
- }
-
- switch (inspector.getCategory()) {
- case STRUCT:
- return createStruct(obj, (StructObjectInspector) inspector);
- case LIST:
- return createArray(obj, (ListObjectInspector) inspector);
- case MAP:
- return createMap(obj, (MapObjectInspector) inspector);
- case PRIMITIVE:
- return createPrimitive(obj, (PrimitiveObjectInspector) inspector);
- default:
- throw new SerDeException("Unknown data type" + inspector.getCategory());
- }
+ parquetRow.value = obj;
+ parquetRow.inspector= (StructObjectInspector)objInspector;
+ return parquetRow;
}
@Override
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
Thu Feb 12 04:53:51 2015
@@ -16,7 +16,7 @@ package org.apache.hadoop.hive.ql.io.par
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
import parquet.hadoop.api.WriteSupport;
import parquet.io.api.RecordConsumer;
@@ -28,7 +28,7 @@ import parquet.schema.MessageTypeParser;
* DataWritableWriteSupport is a WriteSupport for the DataWritableWriter
*
*/
-public class DataWritableWriteSupport extends WriteSupport<ArrayWritable> {
+public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
@@ -55,7 +55,7 @@ public class DataWritableWriteSupport ex
}
@Override
- public void write(final ArrayWritable record) {
+ public void write(final ParquetHiveRecord record) {
writer.write(record);
}
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
Thu Feb 12 04:53:51 2015
@@ -13,37 +13,29 @@
*/
package org.apache.hadoop.hive.ql.io.parquet.write;
-import java.sql.Timestamp;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import parquet.io.api.Binary;
import parquet.io.api.RecordConsumer;
import parquet.schema.GroupType;
import parquet.schema.OriginalType;
import parquet.schema.Type;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
/**
*
- * DataWritableWriter is a writer,
- * that will read an ArrayWritable and give the data to parquet
- * with the expected schema
- * This is a helper class used by DataWritableWriteSupport class.
+ * DataWritableWriter is a writer that reads a ParquetWritable object and send
the data to the Parquet
+ * API with the expected schema. This class is only used through
DataWritableWriteSupport class.
*/
public class DataWritableWriter {
private static final Log LOG = LogFactory.getLog(DataWritableWriter.class);
@@ -57,13 +49,13 @@ public class DataWritableWriter {
/**
* It writes all record values to the Parquet RecordConsumer.
- * @param record Contains the record of values that are going to be written
+ * @param record Contains the record that are going to be written.
*/
- public void write(final ArrayWritable record) {
+ public void write(final ParquetHiveRecord record) {
if (record != null) {
recordConsumer.startMessage();
try {
- writeGroupFields(record, schema);
+ writeGroupFields(record.getObject(), record.getObjectInspector(),
schema);
} catch (RuntimeException e) {
String errorMessage = "Parquet record is malformed: " + e.getMessage();
LOG.error(errorMessage, e);
@@ -76,19 +68,23 @@ public class DataWritableWriter {
/**
* It writes all the fields contained inside a group to the RecordConsumer.
* @param value The list of values contained in the group.
+ * @param inspector The object inspector used to get the correct value type.
* @param type Type that contains information about the group schema.
*/
- public void writeGroupFields(final ArrayWritable value, final GroupType
type) {
+ private void writeGroupFields(final Object value, final
StructObjectInspector inspector, final GroupType type) {
if (value != null) {
+ List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+ List<Object> fieldValuesList =
inspector.getStructFieldsDataAsList(value);
+
for (int i = 0; i < type.getFieldCount(); i++) {
Type fieldType = type.getType(i);
String fieldName = fieldType.getName();
- Writable fieldValue = value.get()[i];
+ Object fieldValue = fieldValuesList.get(i);
- // Parquet does not write null elements
if (fieldValue != null) {
+ ObjectInspector fieldInspector =
fields.get(i).getFieldObjectInspector();
recordConsumer.startField(fieldName, i);
- writeValue(fieldValue, fieldType);
+ writeValue(fieldValue, fieldInspector, fieldType);
recordConsumer.endField(fieldName, i);
}
}
@@ -96,68 +92,93 @@ public class DataWritableWriter {
}
/**
- * It writes the field value to the Parquet RecordConsumer. It detects the
field type, and writes
+ * It writes the field value to the Parquet RecordConsumer. It detects the
field type, and calls
* the correct write function.
* @param value The writable object that contains the value.
+ * @param inspector The object inspector used to get the correct value type.
* @param type Type that contains information about the type schema.
*/
- private void writeValue(final Writable value, final Type type) {
+ private void writeValue(final Object value, final ObjectInspector inspector,
final Type type) {
if (type.isPrimitive()) {
- writePrimitive(value);
- } else if (value instanceof ArrayWritable) {
+ checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE);
+ writePrimitive(value, (PrimitiveObjectInspector)inspector);
+ } else {
GroupType groupType = type.asGroupType();
OriginalType originalType = type.getOriginalType();
if (originalType != null && originalType.equals(OriginalType.LIST)) {
- writeArray((ArrayWritable)value, groupType);
+ checkInspectorCategory(inspector, ObjectInspector.Category.LIST);
+ writeArray(value, (ListObjectInspector)inspector, groupType);
} else if (originalType != null &&
originalType.equals(OriginalType.MAP)) {
- writeMap((ArrayWritable)value, groupType);
+ checkInspectorCategory(inspector, ObjectInspector.Category.MAP);
+ writeMap(value, (MapObjectInspector)inspector, groupType);
} else {
- writeGroup((ArrayWritable) value, groupType);
+ checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT);
+ writeGroup(value, (StructObjectInspector)inspector, groupType);
}
- } else {
- throw new RuntimeException("Field value is not an ArrayWritable object:
" + type);
+ }
+ }
+
+ /**
+ * Checks that an inspector matches the category indicated as a parameter.
+ * @param inspector The object inspector to check
+ * @param category The category to match
+ * @throws IllegalArgumentException if inspector does not match the category
+ */
+ private void checkInspectorCategory(ObjectInspector inspector,
ObjectInspector.Category category) {
+ if (!inspector.getCategory().equals(category)) {
+ throw new IllegalArgumentException("Invalid data type: expected " +
category
+ + " type, but found: " + inspector.getCategory());
}
}
/**
* It writes a group type and all its values to the Parquet RecordConsumer.
* This is used only for optional and required groups.
- * @param value ArrayWritable object that contains the group values
- * @param type Type that contains information about the group schema
+ * @param value Object that contains the group values.
+ * @param inspector The object inspector used to get the correct value type.
+ * @param type Type that contains information about the group schema.
*/
- private void writeGroup(final ArrayWritable value, final GroupType type) {
+ private void writeGroup(final Object value, final StructObjectInspector
inspector, final GroupType type) {
recordConsumer.startGroup();
- writeGroupFields(value, type);
+ writeGroupFields(value, inspector, type);
recordConsumer.endGroup();
}
/**
- * It writes a map type and its key-pair values to the Parquet
RecordConsumer.
- * This is called when the original type (MAP) is detected by writeValue()
- * @param value The list of map values that contains the repeated
KEY_PAIR_VALUE group type
- * @param type Type that contains information about the group schema
+ * It writes a list type and its array elements to the Parquet
RecordConsumer.
+ * This is called when the original type (LIST) is detected by writeValue()/
+ * This function assumes the following schema:
+ * optional group arrayCol (LIST) {
+ * repeated group array {
+ * optional TYPE array_element;
+ * }
+ * }
+ * @param value The object that contains the array values.
+ * @param inspector The object inspector used to get the correct value type.
+ * @param type Type that contains information about the group (LIST) schema.
*/
- private void writeMap(final ArrayWritable value, final GroupType type) {
+ private void writeArray(final Object value, final ListObjectInspector
inspector, final GroupType type) {
+ // Get the internal array structure
GroupType repeatedType = type.getType(0).asGroupType();
- ArrayWritable repeatedValue = (ArrayWritable)value.get()[0];
recordConsumer.startGroup();
recordConsumer.startField(repeatedType.getName(), 0);
- Writable[] map_values = repeatedValue.get();
- for (int record = 0; record < map_values.length; record++) {
- Writable key_value_pair = map_values[record];
- if (key_value_pair != null) {
- // Hive wraps a map key-pair into an ArrayWritable
- if (key_value_pair instanceof ArrayWritable) {
- writeGroup((ArrayWritable)key_value_pair, repeatedType);
- } else {
- throw new RuntimeException("Map key-value pair is not an
ArrayWritable object on record " + record);
- }
- } else {
- throw new RuntimeException("Map key-value pair is null on record " +
record);
+ List<?> arrayValues = inspector.getList(value);
+ ObjectInspector elementInspector =
inspector.getListElementObjectInspector();
+
+ Type elementType = repeatedType.getType(0);
+ String elementName = elementType.getName();
+
+ for (Object element : arrayValues) {
+ recordConsumer.startGroup();
+ if (element != null) {
+ recordConsumer.startField(elementName, 0);
+ writeValue(element, elementInspector, elementType);
+ recordConsumer.endField(elementName, 0);
}
+ recordConsumer.endGroup();
}
recordConsumer.endField(repeatedType.getName(), 0);
@@ -165,35 +186,53 @@ public class DataWritableWriter {
}
/**
- * It writes a list type and its array elements to the Parquet
RecordConsumer.
- * This is called when the original type (LIST) is detected by writeValue()
- * @param array The list of array values that contains the repeated array
group type
- * @param type Type that contains information about the group schema
+ * It writes a map type and its key-pair values to the Parquet
RecordConsumer.
+ * This is called when the original type (MAP) is detected by writeValue().
+ * This function assumes the following schema:
+ * optional group mapCol (MAP) {
+ * repeated group map (MAP_KEY_VALUE) {
+ * required TYPE key;
+ * optional TYPE value;
+ * }
+ * }
+ * @param value The object that contains the map key-values.
+ * @param inspector The object inspector used to get the correct value type.
+ * @param type Type that contains information about the group (MAP) schema.
*/
- private void writeArray(final ArrayWritable array, final GroupType type) {
+ private void writeMap(final Object value, final MapObjectInspector
inspector, final GroupType type) {
+ // Get the internal map structure (MAP_KEY_VALUE)
GroupType repeatedType = type.getType(0).asGroupType();
- ArrayWritable repeatedValue = (ArrayWritable)array.get()[0];
recordConsumer.startGroup();
recordConsumer.startField(repeatedType.getName(), 0);
- Writable[] array_values = repeatedValue.get();
- for (int record = 0; record < array_values.length; record++) {
- recordConsumer.startGroup();
+ Map<?, ?> mapValues = inspector.getMap(value);
- // Null values must be wrapped into startGroup/endGroup
- Writable element = array_values[record];
- if (element != null) {
- for (int i = 0; i < type.getFieldCount(); i++) {
- Type fieldType = repeatedType.getType(i);
- String fieldName = fieldType.getName();
+ Type keyType = repeatedType.getType(0);
+ String keyName = keyType.getName();
+ ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
- recordConsumer.startField(fieldName, i);
- writeValue(element, fieldType);
- recordConsumer.endField(fieldName, i);
+ Type valuetype = repeatedType.getType(1);
+ String valueName = valuetype.getName();
+ ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
+
+ for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
+ recordConsumer.startGroup();
+ if (keyValue != null) {
+ // write key element
+ Object keyElement = keyValue.getKey();
+ recordConsumer.startField(keyName, 0);
+ writeValue(keyElement, keyInspector, keyType);
+ recordConsumer.endField(keyName, 0);
+
+ // write value element
+ Object valueElement = keyValue.getValue();
+ if (valueElement != null) {
+ recordConsumer.startField(valueName, 1);
+ writeValue(valueElement, valueInspector, valuetype);
+ recordConsumer.endField(valueName, 1);
}
}
-
recordConsumer.endGroup();
}
@@ -203,36 +242,89 @@ public class DataWritableWriter {
/**
* It writes the primitive value to the Parquet RecordConsumer.
- * @param value The writable object that contains the primitive value.
+ * @param value The object that contains the primitive value.
+ * @param inspector The object inspector used to get the correct value type.
*/
- private void writePrimitive(final Writable value) {
+ private void writePrimitive(final Object value, final
PrimitiveObjectInspector inspector) {
if (value == null) {
return;
}
- if (value instanceof DoubleWritable) {
- recordConsumer.addDouble(((DoubleWritable) value).get());
- } else if (value instanceof BooleanWritable) {
- recordConsumer.addBoolean(((BooleanWritable) value).get());
- } else if (value instanceof FloatWritable) {
- recordConsumer.addFloat(((FloatWritable) value).get());
- } else if (value instanceof IntWritable) {
- recordConsumer.addInteger(((IntWritable) value).get());
- } else if (value instanceof LongWritable) {
- recordConsumer.addLong(((LongWritable) value).get());
- } else if (value instanceof ShortWritable) {
- recordConsumer.addInteger(((ShortWritable) value).get());
- } else if (value instanceof ByteWritable) {
- recordConsumer.addInteger(((ByteWritable) value).get());
- } else if (value instanceof HiveDecimalWritable) {
- throw new UnsupportedOperationException("HiveDecimalWritable writing not
implemented");
- } else if (value instanceof BytesWritable) {
- recordConsumer.addBinary((Binary.fromByteArray(((BytesWritable)
value).getBytes())));
- } else if (value instanceof TimestampWritable) {
- Timestamp ts = ((TimestampWritable) value).getTimestamp();
- NanoTime nt = NanoTimeUtils.getNanoTime(ts, false);
- nt.writeValue(recordConsumer);
- } else {
- throw new IllegalArgumentException("Unknown value type: " + value + " "
+ value.getClass());
+
+ switch (inspector.getPrimitiveCategory()) {
+ case VOID:
+ return;
+ case DOUBLE:
+ recordConsumer.addDouble(((DoubleObjectInspector)
inspector).get(value));
+ break;
+ case BOOLEAN:
+ recordConsumer.addBoolean(((BooleanObjectInspector)
inspector).get(value));
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value));
+ break;
+ case BYTE:
+ recordConsumer.addInteger(((ByteObjectInspector)
inspector).get(value));
+ break;
+ case INT:
+ recordConsumer.addInteger(((IntObjectInspector) inspector).get(value));
+ break;
+ case LONG:
+ recordConsumer.addLong(((LongObjectInspector) inspector).get(value));
+ break;
+ case SHORT:
+ recordConsumer.addInteger(((ShortObjectInspector)
inspector).get(value));
+ break;
+ case STRING:
+ String v = ((StringObjectInspector)
inspector).getPrimitiveJavaObject(value);
+ recordConsumer.addBinary(Binary.fromString(v));
+ break;
+ case CHAR:
+ String vChar = ((HiveCharObjectInspector)
inspector).getPrimitiveJavaObject(value).getStrippedValue();
+ recordConsumer.addBinary(Binary.fromString(vChar));
+ break;
+ case VARCHAR:
+ String vVarchar = ((HiveVarcharObjectInspector)
inspector).getPrimitiveJavaObject(value).getValue();
+ recordConsumer.addBinary(Binary.fromString(vVarchar));
+ break;
+ case BINARY:
+ byte[] vBinary = ((BinaryObjectInspector)
inspector).getPrimitiveJavaObject(value);
+ recordConsumer.addBinary(Binary.fromByteArray(vBinary));
+ break;
+ case TIMESTAMP:
+ Timestamp ts = ((TimestampObjectInspector)
inspector).getPrimitiveJavaObject(value);
+ recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts,
false).toBinary());
+ break;
+ case DECIMAL:
+ HiveDecimal vDecimal =
((HiveDecimal)inspector.getPrimitiveJavaObject(value));
+ DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
+ recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported primitive data type: "
+ inspector.getPrimitiveCategory());
+ }
+ }
+
+ private Binary decimalToBinary(final HiveDecimal hiveDecimal, final
DecimalTypeInfo decimalTypeInfo) {
+ int prec = decimalTypeInfo.precision();
+ int scale = decimalTypeInfo.scale();
+ byte[] decimalBytes =
hiveDecimal.setScale(scale).unscaledValue().toByteArray();
+
+ // Estimated number of bytes needed.
+ int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
+ if (precToBytes == decimalBytes.length) {
+ // No padding needed.
+ return Binary.fromByteArray(decimalBytes);
}
+
+ byte[] tgt = new byte[precToBytes];
+ if (hiveDecimal.signum() == -1) {
+ // For negative number, initializing bits to 1
+ for (int i = 0; i < precToBytes; i++) {
+ tgt[i] |= 0xFF;
+ }
+ }
+
+ System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length,
decimalBytes.length); // Padding leading zeroes/ones.
+ return Binary.fromByteArray(tgt);
}
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
Thu Feb 12 04:53:51 2015
@@ -20,7 +20,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
@@ -29,22 +28,23 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
import org.apache.hadoop.util.Progressable;
import parquet.hadoop.ParquetOutputFormat;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.util.ContextUtil;
-public class ParquetRecordWriterWrapper implements RecordWriter<Void,
ArrayWritable>,
+public class ParquetRecordWriterWrapper implements RecordWriter<Void,
ParquetHiveRecord>,
org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
public static final Log LOG =
LogFactory.getLog(ParquetRecordWriterWrapper.class);
- private final org.apache.hadoop.mapreduce.RecordWriter<Void, ArrayWritable>
realWriter;
+ private final org.apache.hadoop.mapreduce.RecordWriter<Void,
ParquetHiveRecord> realWriter;
private final TaskAttemptContext taskContext;
public ParquetRecordWriterWrapper(
- final OutputFormat<Void, ArrayWritable> realOutputFormat,
+ final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
final JobConf jobConf,
final String name,
final Progressable progress, Properties tableProperties) throws
@@ -106,7 +106,7 @@ public class ParquetRecordWriterWrapper
}
@Override
- public void write(final Void key, final ArrayWritable value) throws
IOException {
+ public void write(final Void key, final ParquetHiveRecord value) throws
IOException {
try {
realWriter.write(key, value);
} catch (final InterruptedException e) {
@@ -121,7 +121,7 @@ public class ParquetRecordWriterWrapper
@Override
public void write(final Writable w) throws IOException {
- write(null, (ArrayWritable) w);
+ write(null, (ParquetHiveRecord) w);
}
}
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
Thu Feb 12 04:53:51 2015
@@ -13,9 +13,27 @@
*/
package org.apache.hadoop.hive.ql.io.parquet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
@@ -27,6 +45,10 @@ import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -62,6 +84,10 @@ public class TestDataWritableWriter {
inOrder.verify(mockRecordConsumer).addInteger(value);
}
+ private void addLong(int value) {
+ inOrder.verify(mockRecordConsumer).addLong(value);
+ }
+
private void addFloat(float value) {
inOrder.verify(mockRecordConsumer).addFloat(value);
}
@@ -88,6 +114,12 @@ public class TestDataWritableWriter {
private Writable createNull() { return null; }
+ private ByteWritable createTinyInt(byte value) { return new
ByteWritable(value); }
+
+ private ShortWritable createSmallInt(short value) { return new
ShortWritable(value); }
+
+ private LongWritable createBigInt(long value) { return new
LongWritable(value); }
+
private IntWritable createInt(int value) {
return new IntWritable(value);
}
@@ -116,20 +148,68 @@ public class TestDataWritableWriter {
return new ArrayWritable(Writable.class, createGroup(values).get());
}
- private void writeParquetRecord(String schemaStr, ArrayWritable record) {
- MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
- DataWritableWriter hiveParquetWriter = new
DataWritableWriter(mockRecordConsumer, schema);
+ private List<String> createHiveColumnsFrom(final String columnNamesStr) {
+ List<String> columnNames;
+ if (columnNamesStr.length() == 0) {
+ columnNames = new ArrayList<String>();
+ } else {
+ columnNames = Arrays.asList(columnNamesStr.split(","));
+ }
+
+ return columnNames;
+ }
+
+ private List<TypeInfo> createHiveTypeInfoFrom(final String columnsTypeStr) {
+ List<TypeInfo> columnTypes;
+
+ if (columnsTypeStr.length() == 0) {
+ columnTypes = new ArrayList<TypeInfo>();
+ } else {
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr);
+ }
+
+ return columnTypes;
+ }
+
+ private ArrayWritableObjectInspector getObjectInspector(final String
columnNames, final String columnTypes) {
+ List<TypeInfo> columnTypeList = createHiveTypeInfoFrom(columnTypes);
+ List<String> columnNameList = createHiveColumnsFrom(columnNames);
+ StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+
+ return new ArrayWritableObjectInspector(rowTypeInfo);
+ }
+
+ private ParquetHiveRecord getParquetWritable(String columnNames, String
columnTypes, ArrayWritable record) throws SerDeException {
+ Properties recordProperties = new Properties();
+ recordProperties.setProperty("columns", columnNames);
+ recordProperties.setProperty("columns.types", columnTypes);
+
+ ParquetHiveSerDe serDe = new ParquetHiveSerDe();
+ SerDeUtils.initializeSerDe(serDe, new Configuration(), recordProperties,
null);
+
+ return new ParquetHiveRecord(serDe.deserialize(record),
getObjectInspector(columnNames, columnTypes));
+ }
+
+ private void writeParquetRecord(String schema, ParquetHiveRecord record)
throws SerDeException {
+ MessageType fileSchema = MessageTypeParser.parseMessageType(schema);
+ DataWritableWriter hiveParquetWriter = new
DataWritableWriter(mockRecordConsumer, fileSchema);
hiveParquetWriter.write(record);
}
@Test
public void testSimpleType() throws Exception {
- String schemaStr = "message hive_schema {\n"
+ String columnNames =
"int,double,boolean,float,string,tinyint,smallint,bigint";
+ String columnTypes =
"int,double,boolean,float,string,tinyint,smallint,bigint";
+
+ String fileSchema = "message hive_schema {\n"
+ " optional int32 int;\n"
+ " optional double double;\n"
+ " optional boolean boolean;\n"
+ " optional float float;\n"
- + " optional binary string;\n"
+ + " optional binary string (UTF8);\n"
+ + " optional int32 tinyint;\n"
+ + " optional int32 smallint;\n"
+ + " optional int64 bigint;\n"
+ "}\n";
ArrayWritable hiveRecord = createGroup(
@@ -137,11 +217,14 @@ public class TestDataWritableWriter {
createDouble(1.0),
createBoolean(true),
createFloat(1.0f),
- createString("one")
+ createString("one"),
+ createTinyInt((byte)1),
+ createSmallInt((short)1),
+ createBigInt((long)1)
);
// Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
+ writeParquetRecord(fileSchema, getParquetWritable(columnNames,
columnTypes, hiveRecord));
// Verify record was written correctly to Parquet
startMessage();
@@ -160,12 +243,24 @@ public class TestDataWritableWriter {
startField("string", 4);
addString("one");
endField("string", 4);
+ startField("tinyint", 5);
+ addInteger(1);
+ endField("tinyint", 5);
+ startField("smallint", 6);
+ addInteger(1);
+ endField("smallint", 6);
+ startField("bigint", 7);
+ addLong(1);
+ endField("bigint", 7);
endMessage();
}
@Test
public void testStructType() throws Exception {
- String schemaStr = "message hive_schema {\n"
+ String columnNames = "structCol";
+ String columnTypes = "struct<a:int,b:double,c:boolean>";
+
+ String fileSchema = "message hive_schema {\n"
+ " optional group structCol {\n"
+ " optional int32 a;\n"
+ " optional double b;\n"
@@ -182,7 +277,7 @@ public class TestDataWritableWriter {
);
// Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
+ writeParquetRecord(fileSchema, getParquetWritable(columnNames,
columnTypes, hiveRecord));
// Verify record was written correctly to Parquet
startMessage();
@@ -204,9 +299,12 @@ public class TestDataWritableWriter {
@Test
public void testArrayType() throws Exception {
- String schemaStr = "message hive_schema {\n"
+ String columnNames = "arrayCol";
+ String columnTypes = "array<int>";
+
+ String fileSchema = "message hive_schema {\n"
+ " optional group arrayCol (LIST) {\n"
- + " repeated group bag {\n"
+ + " repeated group array {\n"
+ " optional int32 array_element;\n"
+ " }\n"
+ " }\n"
@@ -223,13 +321,13 @@ public class TestDataWritableWriter {
);
// Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
+ writeParquetRecord(fileSchema, getParquetWritable(columnNames,
columnTypes, hiveRecord));
// Verify record was written correctly to Parquet
startMessage();
startField("arrayCol", 0);
startGroup();
- startField("bag", 0);
+ startField("array", 0);
startGroup();
startField("array_element", 0);
addInteger(1);
@@ -242,7 +340,7 @@ public class TestDataWritableWriter {
addInteger(2);
endField("array_element", 0);
endGroup();
- endField("bag", 0);
+ endField("array", 0);
endGroup();
endField("arrayCol", 0);
endMessage();
@@ -250,7 +348,10 @@ public class TestDataWritableWriter {
@Test
public void testMapType() throws Exception {
- String schemaStr = "message hive_schema {\n"
+ String columnNames = "mapCol";
+ String columnTypes = "map<string,int>";
+
+ String fileSchema = "message hive_schema {\n"
+ " optional group mapCol (MAP) {\n"
+ " repeated group map (MAP_KEY_VALUE) {\n"
+ " required binary key;\n"
@@ -279,7 +380,7 @@ public class TestDataWritableWriter {
);
// Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
+ writeParquetRecord(fileSchema, getParquetWritable(columnNames,
columnTypes, hiveRecord));
// Verify record was written correctly to Parquet
startMessage();
@@ -315,12 +416,15 @@ public class TestDataWritableWriter {
@Test
public void testArrayOfArrays() throws Exception {
- String schemaStr = "message hive_schema {\n"
+ String columnNames = "array_of_arrays";
+ String columnTypes = "array<array<int>>";
+
+ String fileSchema = "message hive_schema {\n"
+ " optional group array_of_arrays (LIST) {\n"
+ " repeated group array {\n"
- + " required group element (LIST) {\n"
+ + " optional group array_element (LIST) {\n"
+ " repeated group array {\n"
- + " required int32 element;\n"
+ + " optional int32 array_element;\n"
+ " }\n"
+ " }\n"
+ " }\n"
@@ -341,7 +445,7 @@ public class TestDataWritableWriter {
);
// Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
+ writeParquetRecord(fileSchema, getParquetWritable(columnNames,
columnTypes, hiveRecord));
// Verify record was written correctly to Parquet
startMessage();
@@ -349,22 +453,22 @@ public class TestDataWritableWriter {
startGroup();
startField("array", 0);
startGroup();
- startField("element", 0);
+ startField("array_element", 0);
startGroup();
startField("array", 0);
startGroup();
- startField("element", 0);
+ startField("array_element", 0);
addInteger(1);
- endField("element", 0);
+ endField("array_element", 0);
endGroup();
startGroup();
- startField("element", 0);
+ startField("array_element", 0);
addInteger(2);
- endField("element", 0);
+ endField("array_element", 0);
endGroup();
endField("array", 0);
endGroup();
- endField("element", 0);
+ endField("array_element", 0);
endGroup();
endField("array", 0);
endGroup();
@@ -373,124 +477,63 @@ public class TestDataWritableWriter {
}
@Test
- public void testGroupFieldIsNotArrayWritable() throws Exception {
- String schemaStr = "message hive_schema {\n"
- + " optional group a {\n"
- + " optional int32 b;\n"
- + " }\n"
- + "}\n";
+ public void testExpectedStructTypeOnRecord() throws Exception {
+ String columnNames = "structCol";
+ String columnTypes = "int";
ArrayWritable hiveRecord = createGroup(
- createInt(1)
+ createInt(1)
);
+ String fileSchema = "message hive_schema {\n"
+ + " optional group structCol {\n"
+ + " optional int32 int;\n"
+ + " }\n"
+ + "}\n";
+
try {
- // Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
+ writeParquetRecord(fileSchema, getParquetWritable(columnNames,
columnTypes, hiveRecord));
fail();
} catch (RuntimeException e) {
- assertEquals("Parquet record is malformed: Field value is not an
ArrayWritable object: " +
- "optional group a {\n optional int32 b;\n}", e.getMessage());
+ assertEquals("Parquet record is malformed: Invalid data type: expected
STRUCT type, but found: PRIMITIVE", e.getMessage());
}
}
@Test
- public void testArrayGroupElementIsNotArrayWritable() throws Exception {
- String schemaStr = "message hive_schema {\n"
- + " optional group array_of_arrays (LIST) {\n"
- + " repeated group array {\n"
- + " required group element (LIST) {\n"
- + " required int32 element;\n"
- + " }\n"
- + " }\n"
- + " }\n"
- + "}\n";
+ public void testExpectedArrayTypeOnRecord() throws Exception {
+ String columnNames = "arrayCol";
+ String columnTypes = "int";
ArrayWritable hiveRecord = createGroup(
- createGroup(
- createArray(
- createInt(1)
- )
- )
+ createInt(1)
);
- try {
- // Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
- fail();
- } catch (RuntimeException e) {
- assertEquals("Parquet record is malformed: Field value is not an
ArrayWritable object: " +
- "required group element (LIST) {\n required int32 element;\n}",
e.getMessage());
- }
- }
-
- @Test
- public void testMapElementIsNotArrayWritable() throws Exception {
- String schemaStr = "message hive_schema {\n"
- + " optional group mapCol (MAP) {\n"
- + " repeated group map (MAP_KEY_VALUE) {\n"
- + " required binary key;\n"
- + " optional group value {\n"
- + " required int32 value;"
- + " }\n"
+ String fileSchema = "message hive_schema {\n"
+ + " optional group arrayCol (LIST) {\n"
+ + " repeated group bag {\n"
+ + " optional int32 array_element;\n"
+ " }\n"
+ " }\n"
+ "}\n";
- ArrayWritable hiveRecord = createGroup(
- createGroup(
- createArray(
- createGroup(
- createString("key1"),
- createInt(1)
- )
- )
- )
- );
-
try {
- // Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
+ writeParquetRecord(fileSchema, getParquetWritable(columnNames,
columnTypes, hiveRecord));
fail();
} catch (RuntimeException e) {
- assertEquals(
- "Parquet record is malformed: Field value is not an ArrayWritable
object: " +
- "optional group value {\n required int32 value;\n}",
e.getMessage());
+ assertEquals("Parquet record is malformed: Invalid data type: expected
LIST type, but found: PRIMITIVE", e.getMessage());
}
}
@Test
- public void testMapKeyValueIsNotArrayWritable() throws Exception {
- String schemaStr = "message hive_schema {\n"
- + " optional group mapCol (MAP) {\n"
- + " repeated group map (MAP_KEY_VALUE) {\n"
- + " required binary key;\n"
- + " optional int32 value;\n"
- + " }\n"
- + " }\n"
- + "}\n";
+ public void testExpectedMapTypeOnRecord() throws Exception {
+ String columnNames = "mapCol";
+ String columnTypes = "int";
ArrayWritable hiveRecord = createGroup(
- createGroup(
- createArray(
- createString("key1"),
- createInt(1)
- )
- )
+ createInt(1)
);
- try {
- // Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
- fail();
- } catch (RuntimeException e) {
- assertEquals("Parquet record is malformed: Map key-value pair is not an
ArrayWritable object on record 0", e.getMessage());
- }
- }
-
- @Test
- public void testMapKeyValueIsNull() throws Exception {
- String schemaStr = "message hive_schema {\n"
+ String fileSchema = "message hive_schema {\n"
+ " optional group mapCol (MAP) {\n"
+ " repeated group map (MAP_KEY_VALUE) {\n"
+ " required binary key;\n"
@@ -499,20 +542,11 @@ public class TestDataWritableWriter {
+ " }\n"
+ "}\n";
- ArrayWritable hiveRecord = createGroup(
- createGroup(
- createArray(
- createNull()
- )
- )
- );
-
try {
- // Write record to Parquet format
- writeParquetRecord(schemaStr, hiveRecord);
+ writeParquetRecord(fileSchema, getParquetWritable(columnNames,
columnTypes, hiveRecord));
fail();
} catch (RuntimeException e) {
- assertEquals("Parquet record is malformed: Map key-value pair is null on
record 0", e.getMessage());
+ assertEquals("Parquet record is malformed: Invalid data type: expected
MAP type, but found: PRIMITIVE", e.getMessage());
}
}
}
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
Thu Feb 12 04:53:51 2015
@@ -24,7 +24,7 @@ import java.util.Properties;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Progressable;
import org.junit.Test;
@@ -41,7 +41,7 @@ public class TestMapredParquetOutputForm
@SuppressWarnings("unchecked")
@Test
public void testConstructorWithFormat() {
- new MapredParquetOutputFormat((ParquetOutputFormat<ArrayWritable>)
mock(ParquetOutputFormat.class));
+ new MapredParquetOutputFormat((ParquetOutputFormat<ParquetHiveRecord>)
mock(ParquetOutputFormat.class));
}
@Test
@@ -62,7 +62,7 @@ public class TestMapredParquetOutputForm
tableProps.setProperty("columns.types", "int:int");
final Progressable mockProgress = mock(Progressable.class);
- final ParquetOutputFormat<ArrayWritable> outputFormat =
(ParquetOutputFormat<ArrayWritable>) mock(ParquetOutputFormat.class);
+ final ParquetOutputFormat<ParquetHiveRecord> outputFormat =
(ParquetOutputFormat<ParquetHiveRecord>) mock(ParquetOutputFormat.class);
JobConf jobConf = new JobConf();
@@ -70,7 +70,7 @@ public class TestMapredParquetOutputForm
new MapredParquetOutputFormat(outputFormat) {
@Override
protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
- ParquetOutputFormat<ArrayWritable> realOutputFormat,
+ ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
JobConf jobConf,
String finalOutPath,
Progressable progress,
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java
Thu Feb 12 04:53:51 2015
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.ArrayWritable;
@@ -96,9 +97,9 @@ public class TestParquetSerDe extends Te
assertEquals("deserialization gives the wrong object", t, row);
// Serialize
- final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row,
oi);
- assertEquals("size correct after serialization",
serDe.getSerDeStats().getRawDataSize(), serializedArr.get().length);
- assertTrue("serialized object should be equal to starting object",
arrayWritableEquals(t, serializedArr));
+ final ParquetHiveRecord serializedArr = (ParquetHiveRecord)
serDe.serialize(row, oi);
+ assertEquals("size correct after serialization",
serDe.getSerDeStats().getRawDataSize(),
((ArrayWritable)serializedArr.getObject()).get().length);
+ assertTrue("serialized object should be equal to starting object",
arrayWritableEquals(t, (ArrayWritable)serializedArr.getObject()));
}
private Properties createProperties() {
Added:
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java
URL:
http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java?rev=1659147&view=auto
==============================================================================
---
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java
(added)
+++
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java
Thu Feb 12 04:53:51 2015
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.io;
+
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This class wraps the object and object inspector that will be used later
+ * in DataWritableWriter class to get the object values.
+ */
+public class ParquetHiveRecord implements Writable {
+ public Object value;
+ public StructObjectInspector inspector;
+
+ public ParquetHiveRecord() {
+ this(null, null);
+ }
+
+ public ParquetHiveRecord(final Object o, final StructObjectInspector oi) {
+ value = o;
+ inspector = oi;
+ }
+
+ public StructObjectInspector getObjectInspector() {
+ return inspector;
+ }
+
+ public Object getObject() {
+ return value;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ throw new UnsupportedOperationException("Unsupported method call.");
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ throw new UnsupportedOperationException("Unsupported method call.");
+ }
+}