Modified: hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original) +++ hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Feb 12 04:56:42 2015 @@ -341,7 +341,7 @@ public class FileSinkOperator extends Te statsCollectRawDataSize = conf.isStatsCollectRawDataSize(); statsFromRecordWriter = new boolean[numFiles]; serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); - serializer.initialize(null, conf.getTableInfo().getProperties()); + serializer.initialize(hconf, conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); if (isLogInfoEnabled) {
Modified: hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original) +++ hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Thu Feb 12 04:56:42 2015 @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.t import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; import static org.fusesource.jansi.Ansi.ansi; import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO; +import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO; import static org.fusesource.jansi.internal.CLibrary.isatty; import org.apache.hadoop.hive.conf.HiveConf; @@ -167,6 +168,9 @@ public class TezJobMonitor { if (isatty(STDOUT_FILENO) == 0) { return false; } + if (isatty(STDERR_FILENO) == 0) { + return false; + } } catch (NoClassDefFoundError ignore) { // These errors happen if the JNI lib is not available for your platform. return false; Modified: hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java (original) +++ hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java Thu Feb 12 04:56:42 2015 @@ -21,7 +21,6 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -29,10 +28,10 @@ import org.apache.hadoop.hive.ql.io.IOCo import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; @@ -41,27 +40,25 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.util.Progressable; import parquet.hadoop.ParquetOutputFormat; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.hadoop.util.ContextUtil; /** * * A Parquet OutputFormat for Hive (with the deprecated package mapred) * */ -public class MapredParquetOutputFormat extends FileOutputFormat<Void, ArrayWritable> implements - HiveOutputFormat<Void, ArrayWritable> { +public class MapredParquetOutputFormat extends FileOutputFormat<Void, ParquetHiveRecord> implements + HiveOutputFormat<Void, ParquetHiveRecord> { private static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class); - protected ParquetOutputFormat<ArrayWritable> realOutputFormat; + protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat; public MapredParquetOutputFormat() { - realOutputFormat = new ParquetOutputFormat<ArrayWritable>(new DataWritableWriteSupport()); + realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new DataWritableWriteSupport()); } - public MapredParquetOutputFormat(final OutputFormat<Void, ArrayWritable> mapreduceOutputFormat) { - realOutputFormat = (ParquetOutputFormat<ArrayWritable>) mapreduceOutputFormat; + public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> mapreduceOutputFormat) { + realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) mapreduceOutputFormat; } @Override @@ -70,7 +67,7 @@ public class MapredParquetOutputFormat e } @Override - public RecordWriter<Void, ArrayWritable> getRecordWriter( + public RecordWriter<Void, ParquetHiveRecord> getRecordWriter( final FileSystem ignored, final JobConf job, final String name, @@ -119,7 +116,7 @@ public class MapredParquetOutputFormat e } protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( - ParquetOutputFormat<ArrayWritable> realOutputFormat, + ParquetOutputFormat<ParquetHiveRecord> realOutputFormat, JobConf jobConf, String finalOutPath, Progressable progress, Modified: hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original) +++ hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Thu Feb 12 04:56:42 2015 @@ -13,61 +13,31 @@ */ package org.apache.hadoop.hive.ql.io.parquet.serde; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import parquet.hadoop.ParquetOutputFormat; import parquet.hadoop.ParquetWriter; -import parquet.io.api.Binary; /** * @@ -110,6 +80,13 @@ public class ParquetHiveSerDe extends Ab private long deserializedSize; private String compressionType; + private ParquetHiveRecord parquetRow; + + public ParquetHiveSerDe() { + parquetRow = new ParquetHiveRecord(); + stats = new SerDeStats(); + } + @Override public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException { @@ -144,7 +121,6 @@ public class ParquetHiveSerDe extends Ab this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); // Stats part - stats = new SerDeStats(); serializedSize = 0; deserializedSize = 0; status = LAST_OPERATION.UNKNOWN; @@ -169,7 +145,7 @@ public class ParquetHiveSerDe extends Ab @Override public Class<? extends Writable> getSerializedClass() { - return ArrayWritable.class; + return ParquetHiveRecord.class; } @Override @@ -178,154 +154,11 @@ public class ParquetHiveSerDe extends Ab if (!objInspector.getCategory().equals(Category.STRUCT)) { throw new SerDeException("Cannot serialize " + objInspector.getCategory() + ". Can only serialize a struct"); } - final ArrayWritable serializeData = createStruct(obj, (StructObjectInspector) objInspector); - serializedSize = serializeData.get().length; + serializedSize = ((StructObjectInspector)objInspector).getAllStructFieldRefs().size(); status = LAST_OPERATION.SERIALIZE; - return serializeData; - } - - private ArrayWritable createStruct(final Object obj, final StructObjectInspector inspector) - throws SerDeException { - final List<? extends StructField> fields = inspector.getAllStructFieldRefs(); - final Writable[] arr = new Writable[fields.size()]; - for (int i = 0; i < fields.size(); i++) { - final StructField field = fields.get(i); - final Object subObj = inspector.getStructFieldData(obj, field); - final ObjectInspector subInspector = field.getFieldObjectInspector(); - arr[i] = createObject(subObj, subInspector); - } - return new ArrayWritable(Writable.class, arr); - } - - private Writable createMap(final Object obj, final MapObjectInspector inspector) - throws SerDeException { - final Map<?, ?> sourceMap = inspector.getMap(obj); - final ObjectInspector keyInspector = inspector.getMapKeyObjectInspector(); - final ObjectInspector valueInspector = inspector.getMapValueObjectInspector(); - final List<ArrayWritable> array = new ArrayList<ArrayWritable>(); - - if (sourceMap != null) { - for (final Entry<?, ?> keyValue : sourceMap.entrySet()) { - final Writable key = createObject(keyValue.getKey(), keyInspector); - final Writable value = createObject(keyValue.getValue(), valueInspector); - if (key != null) { - Writable[] arr = new Writable[2]; - arr[0] = key; - arr[1] = value; - array.add(new ArrayWritable(Writable.class, arr)); - } - } - } - if (array.size() > 0) { - final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class, - array.toArray(new ArrayWritable[array.size()])); - return new ArrayWritable(Writable.class, new Writable[] {subArray}); - } else { - return null; - } - } - - private ArrayWritable createArray(final Object obj, final ListObjectInspector inspector) - throws SerDeException { - final List<?> sourceArray = inspector.getList(obj); - final ObjectInspector subInspector = inspector.getListElementObjectInspector(); - final List<Writable> array = new ArrayList<Writable>(); - if (sourceArray != null) { - for (final Object curObj : sourceArray) { - array.add(createObject(curObj, subInspector)); - } - } - if (array.size() > 0) { - final ArrayWritable subArray = new ArrayWritable(Writable.class, - array.toArray(new Writable[array.size()])); - return new ArrayWritable(Writable.class, new Writable[] {subArray}); - } else { - return null; - } - } - - private Writable createPrimitive(final Object obj, final PrimitiveObjectInspector inspector) - throws SerDeException { - if (obj == null) { - return null; - } - switch (inspector.getPrimitiveCategory()) { - case VOID: - return null; - case BOOLEAN: - return new BooleanWritable(((BooleanObjectInspector) inspector).get(obj) ? Boolean.TRUE : Boolean.FALSE); - case BYTE: - return new ByteWritable(((ByteObjectInspector) inspector).get(obj)); - case DOUBLE: - return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj)); - case FLOAT: - return new FloatWritable(((FloatObjectInspector) inspector).get(obj)); - case INT: - return new IntWritable(((IntObjectInspector) inspector).get(obj)); - case LONG: - return new LongWritable(((LongObjectInspector) inspector).get(obj)); - case SHORT: - return new ShortWritable(((ShortObjectInspector) inspector).get(obj)); - case STRING: - String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(obj); - try { - return new BytesWritable(v.getBytes("UTF-8")); - } catch (UnsupportedEncodingException e) { - throw new SerDeException("Failed to encode string in UTF-8", e); - } - case DECIMAL: - HiveDecimal hd = (HiveDecimal)inspector.getPrimitiveJavaObject(obj); - DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) inspector.getTypeInfo(); - int prec = decTypeInfo.precision(); - int scale = decTypeInfo.scale(); - byte[] src = hd.setScale(scale).unscaledValue().toByteArray(); - // Estimated number of bytes needed. - int bytes = PRECISION_TO_BYTE_COUNT[prec - 1]; - if (bytes == src.length) { - // No padding needed. - return new BytesWritable(src); - } - byte[] tgt = new byte[bytes]; - if ( hd.signum() == -1) { - // For negative number, initializing bits to 1 - for (int i = 0; i < bytes; i++) { - tgt[i] |= 0xFF; - } - } - System.arraycopy(src, 0, tgt, bytes - src.length, src.length); // Padding leading zeroes/ones. - return new BytesWritable(tgt); - case TIMESTAMP: - return new TimestampWritable(((TimestampObjectInspector) inspector).getPrimitiveJavaObject(obj)); - case CHAR: - String strippedValue = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(obj).getStrippedValue(); - return new BytesWritable(Binary.fromString(strippedValue).getBytes()); - case VARCHAR: - String value = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(obj).getValue(); - return new BytesWritable(Binary.fromString(value).getBytes()); - case BINARY: - return new BytesWritable(((BinaryObjectInspector) inspector).getPrimitiveJavaObject(obj)); - default: - throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory()); - } - } - - private Writable createObject(final Object obj, final ObjectInspector inspector) throws SerDeException { - if (obj == null) { - return null; - } - - switch (inspector.getCategory()) { - case STRUCT: - return createStruct(obj, (StructObjectInspector) inspector); - case LIST: - return createArray(obj, (ListObjectInspector) inspector); - case MAP: - return createMap(obj, (MapObjectInspector) inspector); - case PRIMITIVE: - return createPrimitive(obj, (PrimitiveObjectInspector) inspector); - default: - throw new SerDeException("Unknown data type" + inspector.getCategory()); - } + parquetRow.value = obj; + parquetRow.inspector= (StructObjectInspector)objInspector; + return parquetRow; } @Override Modified: hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java (original) +++ hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java Thu Feb 12 04:56:42 2015 @@ -16,7 +16,7 @@ package org.apache.hadoop.hive.ql.io.par import java.util.HashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import parquet.hadoop.api.WriteSupport; import parquet.io.api.RecordConsumer; @@ -28,7 +28,7 @@ import parquet.schema.MessageTypeParser; * DataWritableWriteSupport is a WriteSupport for the DataWritableWriter * */ -public class DataWritableWriteSupport extends WriteSupport<ArrayWritable> { +public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> { public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema"; @@ -55,7 +55,7 @@ public class DataWritableWriteSupport ex } @Override - public void write(final ArrayWritable record) { + public void write(final ParquetHiveRecord record) { writer.write(record); } } Modified: hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java (original) +++ hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java Thu Feb 12 04:56:42 2015 @@ -13,37 +13,29 @@ */ package org.apache.hadoop.hive.ql.io.parquet.write; -import java.sql.Timestamp; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; - +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.objectinspector.*; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import parquet.io.api.Binary; import parquet.io.api.RecordConsumer; import parquet.schema.GroupType; import parquet.schema.OriginalType; import parquet.schema.Type; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; + /** * - * DataWritableWriter is a writer, - * that will read an ArrayWritable and give the data to parquet - * with the expected schema - * This is a helper class used by DataWritableWriteSupport class. + * DataWritableWriter is a writer that reads a ParquetWritable object and send the data to the Parquet + * API with the expected schema. This class is only used through DataWritableWriteSupport class. */ public class DataWritableWriter { private static final Log LOG = LogFactory.getLog(DataWritableWriter.class); @@ -57,13 +49,13 @@ public class DataWritableWriter { /** * It writes all record values to the Parquet RecordConsumer. - * @param record Contains the record of values that are going to be written + * @param record Contains the record that are going to be written. */ - public void write(final ArrayWritable record) { + public void write(final ParquetHiveRecord record) { if (record != null) { recordConsumer.startMessage(); try { - writeGroupFields(record, schema); + writeGroupFields(record.getObject(), record.getObjectInspector(), schema); } catch (RuntimeException e) { String errorMessage = "Parquet record is malformed: " + e.getMessage(); LOG.error(errorMessage, e); @@ -76,19 +68,23 @@ public class DataWritableWriter { /** * It writes all the fields contained inside a group to the RecordConsumer. * @param value The list of values contained in the group. + * @param inspector The object inspector used to get the correct value type. * @param type Type that contains information about the group schema. */ - public void writeGroupFields(final ArrayWritable value, final GroupType type) { + private void writeGroupFields(final Object value, final StructObjectInspector inspector, final GroupType type) { if (value != null) { + List<? extends StructField> fields = inspector.getAllStructFieldRefs(); + List<Object> fieldValuesList = inspector.getStructFieldsDataAsList(value); + for (int i = 0; i < type.getFieldCount(); i++) { Type fieldType = type.getType(i); String fieldName = fieldType.getName(); - Writable fieldValue = value.get()[i]; + Object fieldValue = fieldValuesList.get(i); - // Parquet does not write null elements if (fieldValue != null) { + ObjectInspector fieldInspector = fields.get(i).getFieldObjectInspector(); recordConsumer.startField(fieldName, i); - writeValue(fieldValue, fieldType); + writeValue(fieldValue, fieldInspector, fieldType); recordConsumer.endField(fieldName, i); } } @@ -96,68 +92,93 @@ public class DataWritableWriter { } /** - * It writes the field value to the Parquet RecordConsumer. It detects the field type, and writes + * It writes the field value to the Parquet RecordConsumer. It detects the field type, and calls * the correct write function. * @param value The writable object that contains the value. + * @param inspector The object inspector used to get the correct value type. * @param type Type that contains information about the type schema. */ - private void writeValue(final Writable value, final Type type) { + private void writeValue(final Object value, final ObjectInspector inspector, final Type type) { if (type.isPrimitive()) { - writePrimitive(value); - } else if (value instanceof ArrayWritable) { + checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE); + writePrimitive(value, (PrimitiveObjectInspector)inspector); + } else { GroupType groupType = type.asGroupType(); OriginalType originalType = type.getOriginalType(); if (originalType != null && originalType.equals(OriginalType.LIST)) { - writeArray((ArrayWritable)value, groupType); + checkInspectorCategory(inspector, ObjectInspector.Category.LIST); + writeArray(value, (ListObjectInspector)inspector, groupType); } else if (originalType != null && originalType.equals(OriginalType.MAP)) { - writeMap((ArrayWritable)value, groupType); + checkInspectorCategory(inspector, ObjectInspector.Category.MAP); + writeMap(value, (MapObjectInspector)inspector, groupType); } else { - writeGroup((ArrayWritable) value, groupType); + checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT); + writeGroup(value, (StructObjectInspector)inspector, groupType); } - } else { - throw new RuntimeException("Field value is not an ArrayWritable object: " + type); + } + } + + /** + * Checks that an inspector matches the category indicated as a parameter. + * @param inspector The object inspector to check + * @param category The category to match + * @throws IllegalArgumentException if inspector does not match the category + */ + private void checkInspectorCategory(ObjectInspector inspector, ObjectInspector.Category category) { + if (!inspector.getCategory().equals(category)) { + throw new IllegalArgumentException("Invalid data type: expected " + category + + " type, but found: " + inspector.getCategory()); } } /** * It writes a group type and all its values to the Parquet RecordConsumer. * This is used only for optional and required groups. - * @param value ArrayWritable object that contains the group values - * @param type Type that contains information about the group schema + * @param value Object that contains the group values. + * @param inspector The object inspector used to get the correct value type. + * @param type Type that contains information about the group schema. */ - private void writeGroup(final ArrayWritable value, final GroupType type) { + private void writeGroup(final Object value, final StructObjectInspector inspector, final GroupType type) { recordConsumer.startGroup(); - writeGroupFields(value, type); + writeGroupFields(value, inspector, type); recordConsumer.endGroup(); } /** - * It writes a map type and its key-pair values to the Parquet RecordConsumer. - * This is called when the original type (MAP) is detected by writeValue() - * @param value The list of map values that contains the repeated KEY_PAIR_VALUE group type - * @param type Type that contains information about the group schema + * It writes a list type and its array elements to the Parquet RecordConsumer. + * This is called when the original type (LIST) is detected by writeValue()/ + * This function assumes the following schema: + * optional group arrayCol (LIST) { + * repeated group array { + * optional TYPE array_element; + * } + * } + * @param value The object that contains the array values. + * @param inspector The object inspector used to get the correct value type. + * @param type Type that contains information about the group (LIST) schema. */ - private void writeMap(final ArrayWritable value, final GroupType type) { + private void writeArray(final Object value, final ListObjectInspector inspector, final GroupType type) { + // Get the internal array structure GroupType repeatedType = type.getType(0).asGroupType(); - ArrayWritable repeatedValue = (ArrayWritable)value.get()[0]; recordConsumer.startGroup(); recordConsumer.startField(repeatedType.getName(), 0); - Writable[] map_values = repeatedValue.get(); - for (int record = 0; record < map_values.length; record++) { - Writable key_value_pair = map_values[record]; - if (key_value_pair != null) { - // Hive wraps a map key-pair into an ArrayWritable - if (key_value_pair instanceof ArrayWritable) { - writeGroup((ArrayWritable)key_value_pair, repeatedType); - } else { - throw new RuntimeException("Map key-value pair is not an ArrayWritable object on record " + record); - } - } else { - throw new RuntimeException("Map key-value pair is null on record " + record); + List<?> arrayValues = inspector.getList(value); + ObjectInspector elementInspector = inspector.getListElementObjectInspector(); + + Type elementType = repeatedType.getType(0); + String elementName = elementType.getName(); + + for (Object element : arrayValues) { + recordConsumer.startGroup(); + if (element != null) { + recordConsumer.startField(elementName, 0); + writeValue(element, elementInspector, elementType); + recordConsumer.endField(elementName, 0); } + recordConsumer.endGroup(); } recordConsumer.endField(repeatedType.getName(), 0); @@ -165,35 +186,53 @@ public class DataWritableWriter { } /** - * It writes a list type and its array elements to the Parquet RecordConsumer. - * This is called when the original type (LIST) is detected by writeValue() - * @param array The list of array values that contains the repeated array group type - * @param type Type that contains information about the group schema + * It writes a map type and its key-pair values to the Parquet RecordConsumer. + * This is called when the original type (MAP) is detected by writeValue(). + * This function assumes the following schema: + * optional group mapCol (MAP) { + * repeated group map (MAP_KEY_VALUE) { + * required TYPE key; + * optional TYPE value; + * } + * } + * @param value The object that contains the map key-values. + * @param inspector The object inspector used to get the correct value type. + * @param type Type that contains information about the group (MAP) schema. */ - private void writeArray(final ArrayWritable array, final GroupType type) { + private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) { + // Get the internal map structure (MAP_KEY_VALUE) GroupType repeatedType = type.getType(0).asGroupType(); - ArrayWritable repeatedValue = (ArrayWritable)array.get()[0]; recordConsumer.startGroup(); recordConsumer.startField(repeatedType.getName(), 0); - Writable[] array_values = repeatedValue.get(); - for (int record = 0; record < array_values.length; record++) { - recordConsumer.startGroup(); + Map<?, ?> mapValues = inspector.getMap(value); - // Null values must be wrapped into startGroup/endGroup - Writable element = array_values[record]; - if (element != null) { - for (int i = 0; i < type.getFieldCount(); i++) { - Type fieldType = repeatedType.getType(i); - String fieldName = fieldType.getName(); + Type keyType = repeatedType.getType(0); + String keyName = keyType.getName(); + ObjectInspector keyInspector = inspector.getMapKeyObjectInspector(); - recordConsumer.startField(fieldName, i); - writeValue(element, fieldType); - recordConsumer.endField(fieldName, i); + Type valuetype = repeatedType.getType(1); + String valueName = valuetype.getName(); + ObjectInspector valueInspector = inspector.getMapValueObjectInspector(); + + for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) { + recordConsumer.startGroup(); + if (keyValue != null) { + // write key element + Object keyElement = keyValue.getKey(); + recordConsumer.startField(keyName, 0); + writeValue(keyElement, keyInspector, keyType); + recordConsumer.endField(keyName, 0); + + // write value element + Object valueElement = keyValue.getValue(); + if (valueElement != null) { + recordConsumer.startField(valueName, 1); + writeValue(valueElement, valueInspector, valuetype); + recordConsumer.endField(valueName, 1); } } - recordConsumer.endGroup(); } @@ -203,36 +242,89 @@ public class DataWritableWriter { /** * It writes the primitive value to the Parquet RecordConsumer. - * @param value The writable object that contains the primitive value. + * @param value The object that contains the primitive value. + * @param inspector The object inspector used to get the correct value type. */ - private void writePrimitive(final Writable value) { + private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) { if (value == null) { return; } - if (value instanceof DoubleWritable) { - recordConsumer.addDouble(((DoubleWritable) value).get()); - } else if (value instanceof BooleanWritable) { - recordConsumer.addBoolean(((BooleanWritable) value).get()); - } else if (value instanceof FloatWritable) { - recordConsumer.addFloat(((FloatWritable) value).get()); - } else if (value instanceof IntWritable) { - recordConsumer.addInteger(((IntWritable) value).get()); - } else if (value instanceof LongWritable) { - recordConsumer.addLong(((LongWritable) value).get()); - } else if (value instanceof ShortWritable) { - recordConsumer.addInteger(((ShortWritable) value).get()); - } else if (value instanceof ByteWritable) { - recordConsumer.addInteger(((ByteWritable) value).get()); - } else if (value instanceof HiveDecimalWritable) { - throw new UnsupportedOperationException("HiveDecimalWritable writing not implemented"); - } else if (value instanceof BytesWritable) { - recordConsumer.addBinary((Binary.fromByteArray(((BytesWritable) value).getBytes()))); - } else if (value instanceof TimestampWritable) { - Timestamp ts = ((TimestampWritable) value).getTimestamp(); - NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); - nt.writeValue(recordConsumer); - } else { - throw new IllegalArgumentException("Unknown value type: " + value + " " + value.getClass()); + + switch (inspector.getPrimitiveCategory()) { + case VOID: + return; + case DOUBLE: + recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value)); + break; + case BOOLEAN: + recordConsumer.addBoolean(((BooleanObjectInspector) inspector).get(value)); + break; + case FLOAT: + recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value)); + break; + case BYTE: + recordConsumer.addInteger(((ByteObjectInspector) inspector).get(value)); + break; + case INT: + recordConsumer.addInteger(((IntObjectInspector) inspector).get(value)); + break; + case LONG: + recordConsumer.addLong(((LongObjectInspector) inspector).get(value)); + break; + case SHORT: + recordConsumer.addInteger(((ShortObjectInspector) inspector).get(value)); + break; + case STRING: + String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value); + recordConsumer.addBinary(Binary.fromString(v)); + break; + case CHAR: + String vChar = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(value).getStrippedValue(); + recordConsumer.addBinary(Binary.fromString(vChar)); + break; + case VARCHAR: + String vVarchar = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(value).getValue(); + recordConsumer.addBinary(Binary.fromString(vVarchar)); + break; + case BINARY: + byte[] vBinary = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value); + recordConsumer.addBinary(Binary.fromByteArray(vBinary)); + break; + case TIMESTAMP: + Timestamp ts = ((TimestampObjectInspector) inspector).getPrimitiveJavaObject(value); + recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary()); + break; + case DECIMAL: + HiveDecimal vDecimal = ((HiveDecimal)inspector.getPrimitiveJavaObject(value)); + DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo(); + recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo)); + break; + default: + throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory()); + } + } + + private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) { + int prec = decimalTypeInfo.precision(); + int scale = decimalTypeInfo.scale(); + byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray(); + + // Estimated number of bytes needed. + int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1]; + if (precToBytes == decimalBytes.length) { + // No padding needed. + return Binary.fromByteArray(decimalBytes); } + + byte[] tgt = new byte[precToBytes]; + if (hiveDecimal.signum() == -1) { + // For negative number, initializing bits to 1 + for (int i = 0; i < precToBytes; i++) { + tgt[i] |= 0xFF; + } + } + + System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones. + return Binary.fromByteArray(tgt); } } Modified: hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original) +++ hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Thu Feb 12 04:56:42 2015 @@ -20,7 +20,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; @@ -29,22 +28,23 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.util.Progressable; import parquet.hadoop.ParquetOutputFormat; import parquet.hadoop.metadata.CompressionCodecName; import parquet.hadoop.util.ContextUtil; -public class ParquetRecordWriterWrapper implements RecordWriter<Void, ArrayWritable>, +public class ParquetRecordWriterWrapper implements RecordWriter<Void, ParquetHiveRecord>, org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { public static final Log LOG = LogFactory.getLog(ParquetRecordWriterWrapper.class); - private final org.apache.hadoop.mapreduce.RecordWriter<Void, ArrayWritable> realWriter; + private final org.apache.hadoop.mapreduce.RecordWriter<Void, ParquetHiveRecord> realWriter; private final TaskAttemptContext taskContext; public ParquetRecordWriterWrapper( - final OutputFormat<Void, ArrayWritable> realOutputFormat, + final OutputFormat<Void, ParquetHiveRecord> realOutputFormat, final JobConf jobConf, final String name, final Progressable progress, Properties tableProperties) throws @@ -106,7 +106,7 @@ public class ParquetRecordWriterWrapper } @Override - public void write(final Void key, final ArrayWritable value) throws IOException { + public void write(final Void key, final ParquetHiveRecord value) throws IOException { try { realWriter.write(key, value); } catch (final InterruptedException e) { @@ -121,7 +121,7 @@ public class ParquetRecordWriterWrapper @Override public void write(final Writable w) throws IOException { - write(null, (ArrayWritable) w); + write(null, (ParquetHiveRecord) w); } } Modified: hive/branches/parquet/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto (original) +++ hive/branches/parquet/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto Thu Feb 12 04:56:42 2015 @@ -16,7 +16,9 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.io.orc; +package orc.proto; + +option java_package = "org.apache.hadoop.hive.ql.io.orc"; message IntegerStatistics { optional sint64 minimum = 1; @@ -108,7 +110,7 @@ message Stream { ROW_INDEX = 6; BLOOM_FILTER = 7; } - required Kind kind = 1; + optional Kind kind = 1; optional uint32 column = 2; optional uint64 length = 3; } @@ -120,7 +122,7 @@ message ColumnEncoding { DIRECT_V2 = 2; DICTIONARY_V2 = 3; } - required Kind kind = 1; + optional Kind kind = 1; optional uint32 dictionarySize = 2; } @@ -150,7 +152,7 @@ message Type { VARCHAR = 16; CHAR = 17; } - required Kind kind = 1; + optional Kind kind = 1; repeated uint32 subtypes = 2 [packed=true]; repeated string fieldNames = 3; optional uint32 maximumLength = 4; @@ -167,8 +169,8 @@ message StripeInformation { } message UserMetadataItem { - required string name = 1; - required bytes value = 2; + optional string name = 1; + optional bytes value = 2; } message StripeStatistics { Modified: hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java (original) +++ hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java Thu Feb 12 04:56:42 2015 @@ -13,9 +13,27 @@ */ package org.apache.hadoop.hive.ql.io.parquet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.io.*; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; @@ -27,6 +45,10 @@ import parquet.schema.MessageType; import parquet.schema.MessageTypeParser; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -62,6 +84,10 @@ public class TestDataWritableWriter { inOrder.verify(mockRecordConsumer).addInteger(value); } + private void addLong(int value) { + inOrder.verify(mockRecordConsumer).addLong(value); + } + private void addFloat(float value) { inOrder.verify(mockRecordConsumer).addFloat(value); } @@ -88,6 +114,12 @@ public class TestDataWritableWriter { private Writable createNull() { return null; } + private ByteWritable createTinyInt(byte value) { return new ByteWritable(value); } + + private ShortWritable createSmallInt(short value) { return new ShortWritable(value); } + + private LongWritable createBigInt(long value) { return new LongWritable(value); } + private IntWritable createInt(int value) { return new IntWritable(value); } @@ -116,20 +148,68 @@ public class TestDataWritableWriter { return new ArrayWritable(Writable.class, createGroup(values).get()); } - private void writeParquetRecord(String schemaStr, ArrayWritable record) { - MessageType schema = MessageTypeParser.parseMessageType(schemaStr); - DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, schema); + private List<String> createHiveColumnsFrom(final String columnNamesStr) { + List<String> columnNames; + if (columnNamesStr.length() == 0) { + columnNames = new ArrayList<String>(); + } else { + columnNames = Arrays.asList(columnNamesStr.split(",")); + } + + return columnNames; + } + + private List<TypeInfo> createHiveTypeInfoFrom(final String columnsTypeStr) { + List<TypeInfo> columnTypes; + + if (columnsTypeStr.length() == 0) { + columnTypes = new ArrayList<TypeInfo>(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); + } + + return columnTypes; + } + + private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) { + List<TypeInfo> columnTypeList = createHiveTypeInfoFrom(columnTypes); + List<String> columnNameList = createHiveColumnsFrom(columnNames); + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + + return new ArrayWritableObjectInspector(rowTypeInfo); + } + + private ParquetHiveRecord getParquetWritable(String columnNames, String columnTypes, ArrayWritable record) throws SerDeException { + Properties recordProperties = new Properties(); + recordProperties.setProperty("columns", columnNames); + recordProperties.setProperty("columns.types", columnTypes); + + ParquetHiveSerDe serDe = new ParquetHiveSerDe(); + SerDeUtils.initializeSerDe(serDe, new Configuration(), recordProperties, null); + + return new ParquetHiveRecord(serDe.deserialize(record), getObjectInspector(columnNames, columnTypes)); + } + + private void writeParquetRecord(String schema, ParquetHiveRecord record) throws SerDeException { + MessageType fileSchema = MessageTypeParser.parseMessageType(schema); + DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, fileSchema); hiveParquetWriter.write(record); } @Test public void testSimpleType() throws Exception { - String schemaStr = "message hive_schema {\n" + String columnNames = "int,double,boolean,float,string,tinyint,smallint,bigint"; + String columnTypes = "int,double,boolean,float,string,tinyint,smallint,bigint"; + + String fileSchema = "message hive_schema {\n" + " optional int32 int;\n" + " optional double double;\n" + " optional boolean boolean;\n" + " optional float float;\n" - + " optional binary string;\n" + + " optional binary string (UTF8);\n" + + " optional int32 tinyint;\n" + + " optional int32 smallint;\n" + + " optional int64 bigint;\n" + "}\n"; ArrayWritable hiveRecord = createGroup( @@ -137,11 +217,14 @@ public class TestDataWritableWriter { createDouble(1.0), createBoolean(true), createFloat(1.0f), - createString("one") + createString("one"), + createTinyInt((byte)1), + createSmallInt((short)1), + createBigInt((long)1) ); // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord)); // Verify record was written correctly to Parquet startMessage(); @@ -160,12 +243,24 @@ public class TestDataWritableWriter { startField("string", 4); addString("one"); endField("string", 4); + startField("tinyint", 5); + addInteger(1); + endField("tinyint", 5); + startField("smallint", 6); + addInteger(1); + endField("smallint", 6); + startField("bigint", 7); + addLong(1); + endField("bigint", 7); endMessage(); } @Test public void testStructType() throws Exception { - String schemaStr = "message hive_schema {\n" + String columnNames = "structCol"; + String columnTypes = "struct<a:int,b:double,c:boolean>"; + + String fileSchema = "message hive_schema {\n" + " optional group structCol {\n" + " optional int32 a;\n" + " optional double b;\n" @@ -182,7 +277,7 @@ public class TestDataWritableWriter { ); // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord)); // Verify record was written correctly to Parquet startMessage(); @@ -204,9 +299,12 @@ public class TestDataWritableWriter { @Test public void testArrayType() throws Exception { - String schemaStr = "message hive_schema {\n" + String columnNames = "arrayCol"; + String columnTypes = "array<int>"; + + String fileSchema = "message hive_schema {\n" + " optional group arrayCol (LIST) {\n" - + " repeated group bag {\n" + + " repeated group array {\n" + " optional int32 array_element;\n" + " }\n" + " }\n" @@ -223,13 +321,13 @@ public class TestDataWritableWriter { ); // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord)); // Verify record was written correctly to Parquet startMessage(); startField("arrayCol", 0); startGroup(); - startField("bag", 0); + startField("array", 0); startGroup(); startField("array_element", 0); addInteger(1); @@ -242,7 +340,7 @@ public class TestDataWritableWriter { addInteger(2); endField("array_element", 0); endGroup(); - endField("bag", 0); + endField("array", 0); endGroup(); endField("arrayCol", 0); endMessage(); @@ -250,7 +348,10 @@ public class TestDataWritableWriter { @Test public void testMapType() throws Exception { - String schemaStr = "message hive_schema {\n" + String columnNames = "mapCol"; + String columnTypes = "map<string,int>"; + + String fileSchema = "message hive_schema {\n" + " optional group mapCol (MAP) {\n" + " repeated group map (MAP_KEY_VALUE) {\n" + " required binary key;\n" @@ -279,7 +380,7 @@ public class TestDataWritableWriter { ); // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord)); // Verify record was written correctly to Parquet startMessage(); @@ -315,12 +416,15 @@ public class TestDataWritableWriter { @Test public void testArrayOfArrays() throws Exception { - String schemaStr = "message hive_schema {\n" + String columnNames = "array_of_arrays"; + String columnTypes = "array<array<int>>"; + + String fileSchema = "message hive_schema {\n" + " optional group array_of_arrays (LIST) {\n" + " repeated group array {\n" - + " required group element (LIST) {\n" + + " optional group array_element (LIST) {\n" + " repeated group array {\n" - + " required int32 element;\n" + + " optional int32 array_element;\n" + " }\n" + " }\n" + " }\n" @@ -341,7 +445,7 @@ public class TestDataWritableWriter { ); // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord)); // Verify record was written correctly to Parquet startMessage(); @@ -349,22 +453,22 @@ public class TestDataWritableWriter { startGroup(); startField("array", 0); startGroup(); - startField("element", 0); + startField("array_element", 0); startGroup(); startField("array", 0); startGroup(); - startField("element", 0); + startField("array_element", 0); addInteger(1); - endField("element", 0); + endField("array_element", 0); endGroup(); startGroup(); - startField("element", 0); + startField("array_element", 0); addInteger(2); - endField("element", 0); + endField("array_element", 0); endGroup(); endField("array", 0); endGroup(); - endField("element", 0); + endField("array_element", 0); endGroup(); endField("array", 0); endGroup(); @@ -373,124 +477,63 @@ public class TestDataWritableWriter { } @Test - public void testGroupFieldIsNotArrayWritable() throws Exception { - String schemaStr = "message hive_schema {\n" - + " optional group a {\n" - + " optional int32 b;\n" - + " }\n" - + "}\n"; + public void testExpectedStructTypeOnRecord() throws Exception { + String columnNames = "structCol"; + String columnTypes = "int"; ArrayWritable hiveRecord = createGroup( - createInt(1) + createInt(1) ); + String fileSchema = "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 int;\n" + + " }\n" + + "}\n"; + try { - // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord)); fail(); } catch (RuntimeException e) { - assertEquals("Parquet record is malformed: Field value is not an ArrayWritable object: " + - "optional group a {\n optional int32 b;\n}", e.getMessage()); + assertEquals("Parquet record is malformed: Invalid data type: expected STRUCT type, but found: PRIMITIVE", e.getMessage()); } } @Test - public void testArrayGroupElementIsNotArrayWritable() throws Exception { - String schemaStr = "message hive_schema {\n" - + " optional group array_of_arrays (LIST) {\n" - + " repeated group array {\n" - + " required group element (LIST) {\n" - + " required int32 element;\n" - + " }\n" - + " }\n" - + " }\n" - + "}\n"; + public void testExpectedArrayTypeOnRecord() throws Exception { + String columnNames = "arrayCol"; + String columnTypes = "int"; ArrayWritable hiveRecord = createGroup( - createGroup( - createArray( - createInt(1) - ) - ) + createInt(1) ); - try { - // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); - fail(); - } catch (RuntimeException e) { - assertEquals("Parquet record is malformed: Field value is not an ArrayWritable object: " + - "required group element (LIST) {\n required int32 element;\n}", e.getMessage()); - } - } - - @Test - public void testMapElementIsNotArrayWritable() throws Exception { - String schemaStr = "message hive_schema {\n" - + " optional group mapCol (MAP) {\n" - + " repeated group map (MAP_KEY_VALUE) {\n" - + " required binary key;\n" - + " optional group value {\n" - + " required int32 value;" - + " }\n" + String fileSchema = "message hive_schema {\n" + + " optional group arrayCol (LIST) {\n" + + " repeated group bag {\n" + + " optional int32 array_element;\n" + " }\n" + " }\n" + "}\n"; - ArrayWritable hiveRecord = createGroup( - createGroup( - createArray( - createGroup( - createString("key1"), - createInt(1) - ) - ) - ) - ); - try { - // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord)); fail(); } catch (RuntimeException e) { - assertEquals( - "Parquet record is malformed: Field value is not an ArrayWritable object: " + - "optional group value {\n required int32 value;\n}", e.getMessage()); + assertEquals("Parquet record is malformed: Invalid data type: expected LIST type, but found: PRIMITIVE", e.getMessage()); } } @Test - public void testMapKeyValueIsNotArrayWritable() throws Exception { - String schemaStr = "message hive_schema {\n" - + " optional group mapCol (MAP) {\n" - + " repeated group map (MAP_KEY_VALUE) {\n" - + " required binary key;\n" - + " optional int32 value;\n" - + " }\n" - + " }\n" - + "}\n"; + public void testExpectedMapTypeOnRecord() throws Exception { + String columnNames = "mapCol"; + String columnTypes = "int"; ArrayWritable hiveRecord = createGroup( - createGroup( - createArray( - createString("key1"), - createInt(1) - ) - ) + createInt(1) ); - try { - // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); - fail(); - } catch (RuntimeException e) { - assertEquals("Parquet record is malformed: Map key-value pair is not an ArrayWritable object on record 0", e.getMessage()); - } - } - - @Test - public void testMapKeyValueIsNull() throws Exception { - String schemaStr = "message hive_schema {\n" + String fileSchema = "message hive_schema {\n" + " optional group mapCol (MAP) {\n" + " repeated group map (MAP_KEY_VALUE) {\n" + " required binary key;\n" @@ -499,20 +542,11 @@ public class TestDataWritableWriter { + " }\n" + "}\n"; - ArrayWritable hiveRecord = createGroup( - createGroup( - createArray( - createNull() - ) - ) - ); - try { - // Write record to Parquet format - writeParquetRecord(schemaStr, hiveRecord); + writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord)); fail(); } catch (RuntimeException e) { - assertEquals("Parquet record is malformed: Map key-value pair is null on record 0", e.getMessage()); + assertEquals("Parquet record is malformed: Invalid data type: expected MAP type, but found: PRIMITIVE", e.getMessage()); } } } Modified: hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java (original) +++ hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java Thu Feb 12 04:56:42 2015 @@ -24,7 +24,7 @@ import java.util.Properties; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Progressable; import org.junit.Test; @@ -41,7 +41,7 @@ public class TestMapredParquetOutputForm @SuppressWarnings("unchecked") @Test public void testConstructorWithFormat() { - new MapredParquetOutputFormat((ParquetOutputFormat<ArrayWritable>) mock(ParquetOutputFormat.class)); + new MapredParquetOutputFormat((ParquetOutputFormat<ParquetHiveRecord>) mock(ParquetOutputFormat.class)); } @Test @@ -62,7 +62,7 @@ public class TestMapredParquetOutputForm tableProps.setProperty("columns.types", "int:int"); final Progressable mockProgress = mock(Progressable.class); - final ParquetOutputFormat<ArrayWritable> outputFormat = (ParquetOutputFormat<ArrayWritable>) mock(ParquetOutputFormat.class); + final ParquetOutputFormat<ParquetHiveRecord> outputFormat = (ParquetOutputFormat<ParquetHiveRecord>) mock(ParquetOutputFormat.class); JobConf jobConf = new JobConf(); @@ -70,7 +70,7 @@ public class TestMapredParquetOutputForm new MapredParquetOutputFormat(outputFormat) { @Override protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( - ParquetOutputFormat<ArrayWritable> realOutputFormat, + ParquetOutputFormat<ParquetHiveRecord> realOutputFormat, JobConf jobConf, String finalOutPath, Progressable progress, Modified: hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java (original) +++ hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java Thu Feb 12 04:56:42 2015 @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.serde2.Ser import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.ArrayWritable; @@ -96,9 +97,9 @@ public class TestParquetSerDe extends Te assertEquals("deserialization gives the wrong object", t, row); // Serialize - final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi); - assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(), serializedArr.get().length); - assertTrue("serialized object should be equal to starting object", arrayWritableEquals(t, serializedArr)); + final ParquetHiveRecord serializedArr = (ParquetHiveRecord) serDe.serialize(row, oi); + assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(), ((ArrayWritable)serializedArr.getObject()).get().length); + assertTrue("serialized object should be equal to starting object", arrayWritableEquals(t, (ArrayWritable)serializedArr.getObject())); } private Properties createProperties() { Modified: hive/branches/parquet/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original) +++ hive/branches/parquet/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Thu Feb 12 04:56:42 2015 @@ -20,7 +20,7 @@ package org.apache.hive.service.cli.thri import java.util.Arrays; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -63,9 +63,10 @@ public class ThriftHttpCLIService extend httpServer = new org.eclipse.jetty.server.Server(); // Server thread pool + // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests String threadPoolName = "HiveServer2-HttpHandler-Pool"; ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, - workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), + workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName)); ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService); httpServer.setThreadPool(threadPool); Modified: hive/branches/parquet/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java URL: http://svn.apache.org/viewvc/hive/branches/parquet/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1659148&r1=1659147&r2=1659148&view=diff ============================================================================== --- hive/branches/parquet/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original) +++ hive/branches/parquet/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Feb 12 04:56:42 2015 @@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUti import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider.Options; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.DefaultFileAccess; @@ -495,8 +496,10 @@ public class Hadoop23Shims extends Hadoo // Need to set the client's KeyProvider to the NN's for JKS, // else the updates do not get flushed properly - miniDFSCluster.getFileSystem().getClient().setKeyProvider( - miniDFSCluster.getNameNode().getNamesystem().getProvider()); + KeyProviderCryptoExtension keyProvider = miniDFSCluster.getNameNode().getNamesystem().getProvider(); + if (keyProvider != null) { + miniDFSCluster.getFileSystem().getClient().setKeyProvider(keyProvider); + } cluster = new MiniDFSShim(miniDFSCluster); return cluster;
