Modified: 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
 (original)
+++ 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
 Thu Feb 12 04:56:42 2015
@@ -341,7 +341,7 @@ public class FileSinkOperator extends Te
       statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
       statsFromRecordWriter = new boolean[numFiles];
       serializer = (Serializer) 
conf.getTableInfo().getDeserializerClass().newInstance();
-      serializer.initialize(null, conf.getTableInfo().getProperties());
+      serializer.initialize(hconf, conf.getTableInfo().getProperties());
       outputClass = serializer.getSerializedClass();
 
       if (isLogInfoEnabled) {

Modified: 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
 (original)
+++ 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
 Thu Feb 12 04:56:42 2015
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.t
 import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
 import static org.fusesource.jansi.Ansi.ansi;
 import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
 import static org.fusesource.jansi.internal.CLibrary.isatty;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -167,6 +168,9 @@ public class TezJobMonitor {
       if (isatty(STDOUT_FILENO) == 0) {
         return false;
       }
+      if (isatty(STDERR_FILENO) == 0) {
+        return false;
+      }
     } catch (NoClassDefFoundError ignore) {
       // These errors happen if the JNI lib is not available for your platform.
       return false;

Modified: 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
 (original)
+++ 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
 Thu Feb 12 04:56:42 2015
@@ -21,7 +21,6 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -29,10 +28,10 @@ import org.apache.hadoop.hive.ql.io.IOCo
 import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
 import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
 import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -41,27 +40,25 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ContextUtil;
 
 /**
  *
  * A Parquet OutputFormat for Hive (with the deprecated package mapred)
  *
  */
-public class MapredParquetOutputFormat extends FileOutputFormat<Void, 
ArrayWritable> implements
-  HiveOutputFormat<Void, ArrayWritable> {
+public class MapredParquetOutputFormat extends FileOutputFormat<Void, 
ParquetHiveRecord> implements
+  HiveOutputFormat<Void, ParquetHiveRecord> {
 
   private static final Log LOG = 
LogFactory.getLog(MapredParquetOutputFormat.class);
 
-  protected ParquetOutputFormat<ArrayWritable> realOutputFormat;
+  protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat;
 
   public MapredParquetOutputFormat() {
-    realOutputFormat = new ParquetOutputFormat<ArrayWritable>(new 
DataWritableWriteSupport());
+    realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new 
DataWritableWriteSupport());
   }
 
-  public MapredParquetOutputFormat(final OutputFormat<Void, ArrayWritable> 
mapreduceOutputFormat) {
-    realOutputFormat = (ParquetOutputFormat<ArrayWritable>) 
mapreduceOutputFormat;
+  public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> 
mapreduceOutputFormat) {
+    realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) 
mapreduceOutputFormat;
   }
 
   @Override
@@ -70,7 +67,7 @@ public class MapredParquetOutputFormat e
   }
 
   @Override
-  public RecordWriter<Void, ArrayWritable> getRecordWriter(
+  public RecordWriter<Void, ParquetHiveRecord> getRecordWriter(
       final FileSystem ignored,
       final JobConf job,
       final String name,
@@ -119,7 +116,7 @@ public class MapredParquetOutputFormat e
   }
 
   protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
-      ParquetOutputFormat<ArrayWritable> realOutputFormat,
+      ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
       JobConf jobConf,
       String finalOutPath,
       Progressable progress,

Modified: 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
 (original)
+++ 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
 Thu Feb 12 04:56:42 2015
@@ -13,61 +13,31 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.serde;
 
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeSpec;
 import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.ParquetWriter;
-import parquet.io.api.Binary;
 
 /**
  *
@@ -110,6 +80,13 @@ public class ParquetHiveSerDe extends Ab
   private long deserializedSize;
   private String compressionType;
 
+  private ParquetHiveRecord parquetRow;
+
+  public ParquetHiveSerDe() {
+    parquetRow = new ParquetHiveRecord();
+    stats = new SerDeStats();
+  }
+
   @Override
   public final void initialize(final Configuration conf, final Properties tbl) 
throws SerDeException {
 
@@ -144,7 +121,6 @@ public class ParquetHiveSerDe extends Ab
     this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) 
rowTypeInfo);
 
     // Stats part
-    stats = new SerDeStats();
     serializedSize = 0;
     deserializedSize = 0;
     status = LAST_OPERATION.UNKNOWN;
@@ -169,7 +145,7 @@ public class ParquetHiveSerDe extends Ab
 
   @Override
   public Class<? extends Writable> getSerializedClass() {
-    return ArrayWritable.class;
+    return ParquetHiveRecord.class;
   }
 
   @Override
@@ -178,154 +154,11 @@ public class ParquetHiveSerDe extends Ab
     if (!objInspector.getCategory().equals(Category.STRUCT)) {
       throw new SerDeException("Cannot serialize " + 
objInspector.getCategory() + ". Can only serialize a struct");
     }
-    final ArrayWritable serializeData = createStruct(obj, 
(StructObjectInspector) objInspector);
-    serializedSize = serializeData.get().length;
+    serializedSize = 
((StructObjectInspector)objInspector).getAllStructFieldRefs().size();
     status = LAST_OPERATION.SERIALIZE;
-    return serializeData;
-  }
-
-  private ArrayWritable createStruct(final Object obj, final 
StructObjectInspector inspector)
-      throws SerDeException {
-    final List<? extends StructField> fields = 
inspector.getAllStructFieldRefs();
-    final Writable[] arr = new Writable[fields.size()];
-    for (int i = 0; i < fields.size(); i++) {
-      final StructField field = fields.get(i);
-      final Object subObj = inspector.getStructFieldData(obj, field);
-      final ObjectInspector subInspector = field.getFieldObjectInspector();
-      arr[i] = createObject(subObj, subInspector);
-    }
-    return new ArrayWritable(Writable.class, arr);
-  }
-
-  private Writable createMap(final Object obj, final MapObjectInspector 
inspector)
-      throws SerDeException {
-    final Map<?, ?> sourceMap = inspector.getMap(obj);
-    final ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
-    final ObjectInspector valueInspector = 
inspector.getMapValueObjectInspector();
-    final List<ArrayWritable> array = new ArrayList<ArrayWritable>();
-
-    if (sourceMap != null) {
-      for (final Entry<?, ?> keyValue : sourceMap.entrySet()) {
-        final Writable key = createObject(keyValue.getKey(), keyInspector);
-        final Writable value = createObject(keyValue.getValue(), 
valueInspector);
-        if (key != null) {
-          Writable[] arr = new Writable[2];
-          arr[0] = key;
-          arr[1] = value;
-          array.add(new ArrayWritable(Writable.class, arr));
-        }
-      }
-    }
-    if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class,
-          array.toArray(new ArrayWritable[array.size()]));
-      return new ArrayWritable(Writable.class, new Writable[] {subArray});
-    } else {
-      return null;
-    }
-  }
-
-  private ArrayWritable createArray(final Object obj, final 
ListObjectInspector inspector)
-      throws SerDeException {
-    final List<?> sourceArray = inspector.getList(obj);
-    final ObjectInspector subInspector = 
inspector.getListElementObjectInspector();
-    final List<Writable> array = new ArrayList<Writable>();
-    if (sourceArray != null) {
-      for (final Object curObj : sourceArray) {
-        array.add(createObject(curObj, subInspector));
-      }
-    }
-    if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(Writable.class,
-          array.toArray(new Writable[array.size()]));
-      return new ArrayWritable(Writable.class, new Writable[] {subArray});
-    } else {
-      return null;
-    }
-  }
-
-  private Writable createPrimitive(final Object obj, final 
PrimitiveObjectInspector inspector)
-      throws SerDeException {
-    if (obj == null) {
-      return null;
-    }
-    switch (inspector.getPrimitiveCategory()) {
-    case VOID:
-      return null;
-    case BOOLEAN:
-      return new BooleanWritable(((BooleanObjectInspector) inspector).get(obj) 
? Boolean.TRUE : Boolean.FALSE);
-    case BYTE:
-      return new ByteWritable(((ByteObjectInspector) inspector).get(obj));
-    case DOUBLE:
-      return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj));
-    case FLOAT:
-      return new FloatWritable(((FloatObjectInspector) inspector).get(obj));
-    case INT:
-      return new IntWritable(((IntObjectInspector) inspector).get(obj));
-    case LONG:
-      return new LongWritable(((LongObjectInspector) inspector).get(obj));
-    case SHORT:
-      return new ShortWritable(((ShortObjectInspector) inspector).get(obj));
-    case STRING:
-      String v = ((StringObjectInspector) 
inspector).getPrimitiveJavaObject(obj);
-      try {
-        return new BytesWritable(v.getBytes("UTF-8"));
-      } catch (UnsupportedEncodingException e) {
-        throw new SerDeException("Failed to encode string in UTF-8", e);
-      }
-    case DECIMAL:
-      HiveDecimal hd = (HiveDecimal)inspector.getPrimitiveJavaObject(obj);
-      DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) inspector.getTypeInfo();
-      int prec = decTypeInfo.precision();
-      int scale = decTypeInfo.scale();
-      byte[] src = hd.setScale(scale).unscaledValue().toByteArray();
-      // Estimated number of bytes needed.
-      int bytes =  PRECISION_TO_BYTE_COUNT[prec - 1];
-      if (bytes == src.length) {
-        // No padding needed.
-        return new BytesWritable(src);
-      }
-      byte[] tgt = new byte[bytes];
-      if ( hd.signum() == -1) {
-        // For negative number, initializing bits to 1
-        for (int i = 0; i < bytes; i++) {
-          tgt[i] |= 0xFF;
-        }
-      }
-      System.arraycopy(src, 0, tgt, bytes - src.length, src.length); // 
Padding leading zeroes/ones.
-      return new BytesWritable(tgt);
-    case TIMESTAMP:
-      return new TimestampWritable(((TimestampObjectInspector) 
inspector).getPrimitiveJavaObject(obj));
-    case CHAR:
-      String strippedValue = ((HiveCharObjectInspector) 
inspector).getPrimitiveJavaObject(obj).getStrippedValue();
-      return new BytesWritable(Binary.fromString(strippedValue).getBytes());
-    case VARCHAR:
-      String value = ((HiveVarcharObjectInspector) 
inspector).getPrimitiveJavaObject(obj).getValue();
-      return new BytesWritable(Binary.fromString(value).getBytes());
-    case BINARY:
-      return new BytesWritable(((BinaryObjectInspector) 
inspector).getPrimitiveJavaObject(obj));
-    default:
-      throw new SerDeException("Unknown primitive : " + 
inspector.getPrimitiveCategory());
-    }
-  }
-
-  private Writable createObject(final Object obj, final ObjectInspector 
inspector) throws SerDeException {
-    if (obj == null) {
-      return null;
-    }
-
-    switch (inspector.getCategory()) {
-    case STRUCT:
-      return createStruct(obj, (StructObjectInspector) inspector);
-    case LIST:
-      return createArray(obj, (ListObjectInspector) inspector);
-    case MAP:
-      return createMap(obj, (MapObjectInspector) inspector);
-    case PRIMITIVE:
-      return createPrimitive(obj, (PrimitiveObjectInspector) inspector);
-    default:
-      throw new SerDeException("Unknown data type" + inspector.getCategory());
-    }
+    parquetRow.value = obj;
+    parquetRow.inspector= (StructObjectInspector)objInspector;
+    return parquetRow;
   }
 
   @Override

Modified: 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
 (original)
+++ 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
 Thu Feb 12 04:56:42 2015
@@ -16,7 +16,7 @@ package org.apache.hadoop.hive.ql.io.par
 import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 
 import parquet.hadoop.api.WriteSupport;
 import parquet.io.api.RecordConsumer;
@@ -28,7 +28,7 @@ import parquet.schema.MessageTypeParser;
  * DataWritableWriteSupport is a WriteSupport for the DataWritableWriter
  *
  */
-public class DataWritableWriteSupport extends WriteSupport<ArrayWritable> {
+public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
 
   public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
 
@@ -55,7 +55,7 @@ public class DataWritableWriteSupport ex
   }
 
   @Override
-  public void write(final ArrayWritable record) {
+  public void write(final ParquetHiveRecord record) {
     writer.write(record);
   }
 }

Modified: 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
 (original)
+++ 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
 Thu Feb 12 04:56:42 2015
@@ -13,37 +13,29 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.write;
 
-import java.sql.Timestamp;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import parquet.io.api.Binary;
 import parquet.io.api.RecordConsumer;
 import parquet.schema.GroupType;
 import parquet.schema.OriginalType;
 import parquet.schema.Type;
 
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
 /**
  *
- * DataWritableWriter is a writer,
- * that will read an ArrayWritable and give the data to parquet
- * with the expected schema
- * This is a helper class used by DataWritableWriteSupport class.
+ * DataWritableWriter is a writer that reads a ParquetWritable object and send 
the data to the Parquet
+ * API with the expected schema. This class is only used through 
DataWritableWriteSupport class.
  */
 public class DataWritableWriter {
   private static final Log LOG = LogFactory.getLog(DataWritableWriter.class);
@@ -57,13 +49,13 @@ public class DataWritableWriter {
 
   /**
    * It writes all record values to the Parquet RecordConsumer.
-   * @param record Contains the record of values that are going to be written
+   * @param record Contains the record that are going to be written.
    */
-  public void write(final ArrayWritable record) {
+  public void write(final ParquetHiveRecord record) {
     if (record != null) {
       recordConsumer.startMessage();
       try {
-        writeGroupFields(record, schema);
+        writeGroupFields(record.getObject(), record.getObjectInspector(), 
schema);
       } catch (RuntimeException e) {
         String errorMessage = "Parquet record is malformed: " + e.getMessage();
         LOG.error(errorMessage, e);
@@ -76,19 +68,23 @@ public class DataWritableWriter {
   /**
    * It writes all the fields contained inside a group to the RecordConsumer.
    * @param value The list of values contained in the group.
+   * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the group schema.
    */
-  public void writeGroupFields(final ArrayWritable value, final GroupType 
type) {
+  private void writeGroupFields(final Object value, final 
StructObjectInspector inspector, final GroupType type) {
     if (value != null) {
+      List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+      List<Object> fieldValuesList = 
inspector.getStructFieldsDataAsList(value);
+
       for (int i = 0; i < type.getFieldCount(); i++) {
         Type fieldType = type.getType(i);
         String fieldName = fieldType.getName();
-        Writable fieldValue = value.get()[i];
+        Object fieldValue = fieldValuesList.get(i);
 
-        // Parquet does not write null elements
         if (fieldValue != null) {
+          ObjectInspector fieldInspector = 
fields.get(i).getFieldObjectInspector();
           recordConsumer.startField(fieldName, i);
-          writeValue(fieldValue, fieldType);
+          writeValue(fieldValue, fieldInspector, fieldType);
           recordConsumer.endField(fieldName, i);
         }
       }
@@ -96,68 +92,93 @@ public class DataWritableWriter {
   }
 
   /**
-   * It writes the field value to the Parquet RecordConsumer. It detects the 
field type, and writes
+   * It writes the field value to the Parquet RecordConsumer. It detects the 
field type, and calls
    * the correct write function.
    * @param value The writable object that contains the value.
+   * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the type schema.
    */
-  private void writeValue(final Writable value, final Type type) {
+  private void writeValue(final Object value, final ObjectInspector inspector, 
final Type type) {
     if (type.isPrimitive()) {
-      writePrimitive(value);
-    } else if (value instanceof ArrayWritable) {
+      checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE);
+      writePrimitive(value, (PrimitiveObjectInspector)inspector);
+    } else {
       GroupType groupType = type.asGroupType();
       OriginalType originalType = type.getOriginalType();
 
       if (originalType != null && originalType.equals(OriginalType.LIST)) {
-        writeArray((ArrayWritable)value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.LIST);
+        writeArray(value, (ListObjectInspector)inspector, groupType);
       } else if (originalType != null && 
originalType.equals(OriginalType.MAP)) {
-        writeMap((ArrayWritable)value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.MAP);
+        writeMap(value, (MapObjectInspector)inspector, groupType);
       } else {
-        writeGroup((ArrayWritable) value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT);
+        writeGroup(value, (StructObjectInspector)inspector, groupType);
       }
-    } else {
-      throw new RuntimeException("Field value is not an ArrayWritable object: 
" + type);
+    }
+  }
+
+  /**
+   * Checks that an inspector matches the category indicated as a parameter.
+   * @param inspector The object inspector to check
+   * @param category The category to match
+   * @throws IllegalArgumentException if inspector does not match the category
+   */
+  private void checkInspectorCategory(ObjectInspector inspector, 
ObjectInspector.Category category) {
+    if (!inspector.getCategory().equals(category)) {
+      throw new IllegalArgumentException("Invalid data type: expected " + 
category
+          + " type, but found: " + inspector.getCategory());
     }
   }
 
   /**
    * It writes a group type and all its values to the Parquet RecordConsumer.
    * This is used only for optional and required groups.
-   * @param value ArrayWritable object that contains the group values
-   * @param type Type that contains information about the group schema
+   * @param value Object that contains the group values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group schema.
    */
-  private void writeGroup(final ArrayWritable value, final GroupType type) {
+  private void writeGroup(final Object value, final StructObjectInspector 
inspector, final GroupType type) {
     recordConsumer.startGroup();
-    writeGroupFields(value, type);
+    writeGroupFields(value, inspector, type);
     recordConsumer.endGroup();
   }
 
   /**
-   * It writes a map type and its key-pair values to the Parquet 
RecordConsumer.
-   * This is called when the original type (MAP) is detected by writeValue()
-   * @param value The list of map values that contains the repeated 
KEY_PAIR_VALUE group type
-   * @param type Type that contains information about the group schema
+   * It writes a list type and its array elements to the Parquet 
RecordConsumer.
+   * This is called when the original type (LIST) is detected by writeValue()/
+   * This function assumes the following schema:
+   *    optional group arrayCol (LIST) {
+   *      repeated group array {
+   *        optional TYPE array_element;
+   *      }
+   *    }
+   * @param value The object that contains the array values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group (LIST) schema.
    */
-  private void writeMap(final ArrayWritable value, final GroupType type) {
+  private void writeArray(final Object value, final ListObjectInspector 
inspector, final GroupType type) {
+    // Get the internal array structure
     GroupType repeatedType = type.getType(0).asGroupType();
-    ArrayWritable repeatedValue = (ArrayWritable)value.get()[0];
 
     recordConsumer.startGroup();
     recordConsumer.startField(repeatedType.getName(), 0);
 
-    Writable[] map_values = repeatedValue.get();
-    for (int record = 0; record < map_values.length; record++) {
-      Writable key_value_pair = map_values[record];
-      if (key_value_pair != null) {
-        // Hive wraps a map key-pair into an ArrayWritable
-        if (key_value_pair instanceof ArrayWritable) {
-          writeGroup((ArrayWritable)key_value_pair, repeatedType);
-        } else {
-          throw new RuntimeException("Map key-value pair is not an 
ArrayWritable object on record " + record);
-        }
-      } else {
-        throw new RuntimeException("Map key-value pair is null on record " + 
record);
+    List<?> arrayValues = inspector.getList(value);
+    ObjectInspector elementInspector = 
inspector.getListElementObjectInspector();
+
+    Type elementType = repeatedType.getType(0);
+    String elementName = elementType.getName();
+
+    for (Object element : arrayValues) {
+      recordConsumer.startGroup();
+      if (element != null) {
+        recordConsumer.startField(elementName, 0);
+        writeValue(element, elementInspector, elementType);
+        recordConsumer.endField(elementName, 0);
       }
+      recordConsumer.endGroup();
     }
 
     recordConsumer.endField(repeatedType.getName(), 0);
@@ -165,35 +186,53 @@ public class DataWritableWriter {
   }
 
   /**
-   * It writes a list type and its array elements to the Parquet 
RecordConsumer.
-   * This is called when the original type (LIST) is detected by writeValue()
-   * @param array The list of array values that contains the repeated array 
group type
-   * @param type Type that contains information about the group schema
+   * It writes a map type and its key-pair values to the Parquet 
RecordConsumer.
+   * This is called when the original type (MAP) is detected by writeValue().
+   * This function assumes the following schema:
+   *    optional group mapCol (MAP) {
+   *      repeated group map (MAP_KEY_VALUE) {
+   *        required TYPE key;
+   *        optional TYPE value;
+   *      }
+   *    }
+   * @param value The object that contains the map key-values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group (MAP) schema.
    */
-  private void writeArray(final ArrayWritable array, final GroupType type) {
+  private void writeMap(final Object value, final MapObjectInspector 
inspector, final GroupType type) {
+    // Get the internal map structure (MAP_KEY_VALUE)
     GroupType repeatedType = type.getType(0).asGroupType();
-    ArrayWritable repeatedValue = (ArrayWritable)array.get()[0];
 
     recordConsumer.startGroup();
     recordConsumer.startField(repeatedType.getName(), 0);
 
-    Writable[] array_values = repeatedValue.get();
-    for (int record = 0; record < array_values.length; record++) {
-      recordConsumer.startGroup();
+    Map<?, ?> mapValues = inspector.getMap(value);
 
-      // Null values must be wrapped into startGroup/endGroup
-      Writable element = array_values[record];
-      if (element != null) {
-        for (int i = 0; i < type.getFieldCount(); i++) {
-          Type fieldType = repeatedType.getType(i);
-          String fieldName = fieldType.getName();
+    Type keyType = repeatedType.getType(0);
+    String keyName = keyType.getName();
+    ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
 
-          recordConsumer.startField(fieldName, i);
-          writeValue(element, fieldType);
-          recordConsumer.endField(fieldName, i);
+    Type valuetype = repeatedType.getType(1);
+    String valueName = valuetype.getName();
+    ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
+
+    for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
+      recordConsumer.startGroup();
+      if (keyValue != null) {
+        // write key element
+        Object keyElement = keyValue.getKey();
+        recordConsumer.startField(keyName, 0);
+        writeValue(keyElement, keyInspector, keyType);
+        recordConsumer.endField(keyName, 0);
+
+        // write value element
+        Object valueElement = keyValue.getValue();
+        if (valueElement != null) {
+          recordConsumer.startField(valueName, 1);
+          writeValue(valueElement, valueInspector, valuetype);
+          recordConsumer.endField(valueName, 1);
         }
       }
-
       recordConsumer.endGroup();
     }
 
@@ -203,36 +242,89 @@ public class DataWritableWriter {
 
   /**
    * It writes the primitive value to the Parquet RecordConsumer.
-   * @param value The writable object that contains the primitive value.
+   * @param value The object that contains the primitive value.
+   * @param inspector The object inspector used to get the correct value type.
    */
-  private void writePrimitive(final Writable value) {
+  private void writePrimitive(final Object value, final 
PrimitiveObjectInspector inspector) {
     if (value == null) {
       return;
     }
-    if (value instanceof DoubleWritable) {
-      recordConsumer.addDouble(((DoubleWritable) value).get());
-    } else if (value instanceof BooleanWritable) {
-      recordConsumer.addBoolean(((BooleanWritable) value).get());
-    } else if (value instanceof FloatWritable) {
-      recordConsumer.addFloat(((FloatWritable) value).get());
-    } else if (value instanceof IntWritable) {
-      recordConsumer.addInteger(((IntWritable) value).get());
-    } else if (value instanceof LongWritable) {
-      recordConsumer.addLong(((LongWritable) value).get());
-    } else if (value instanceof ShortWritable) {
-      recordConsumer.addInteger(((ShortWritable) value).get());
-    } else if (value instanceof ByteWritable) {
-      recordConsumer.addInteger(((ByteWritable) value).get());
-    } else if (value instanceof HiveDecimalWritable) {
-      throw new UnsupportedOperationException("HiveDecimalWritable writing not 
implemented");
-    } else if (value instanceof BytesWritable) {
-      recordConsumer.addBinary((Binary.fromByteArray(((BytesWritable) 
value).getBytes())));
-    } else if (value instanceof TimestampWritable) {
-      Timestamp ts = ((TimestampWritable) value).getTimestamp();
-      NanoTime nt = NanoTimeUtils.getNanoTime(ts, false);
-      nt.writeValue(recordConsumer);
-    } else {
-      throw new IllegalArgumentException("Unknown value type: " + value + " " 
+ value.getClass());
+
+    switch (inspector.getPrimitiveCategory()) {
+      case VOID:
+        return;
+      case DOUBLE:
+        recordConsumer.addDouble(((DoubleObjectInspector) 
inspector).get(value));
+        break;
+      case BOOLEAN:
+        recordConsumer.addBoolean(((BooleanObjectInspector) 
inspector).get(value));
+        break;
+      case FLOAT:
+        recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value));
+        break;
+      case BYTE:
+        recordConsumer.addInteger(((ByteObjectInspector) 
inspector).get(value));
+        break;
+      case INT:
+        recordConsumer.addInteger(((IntObjectInspector) inspector).get(value));
+        break;
+      case LONG:
+        recordConsumer.addLong(((LongObjectInspector) inspector).get(value));
+        break;
+      case SHORT:
+        recordConsumer.addInteger(((ShortObjectInspector) 
inspector).get(value));
+        break;
+      case STRING:
+        String v = ((StringObjectInspector) 
inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(Binary.fromString(v));
+        break;
+      case CHAR:
+        String vChar = ((HiveCharObjectInspector) 
inspector).getPrimitiveJavaObject(value).getStrippedValue();
+        recordConsumer.addBinary(Binary.fromString(vChar));
+        break;
+      case VARCHAR:
+        String vVarchar = ((HiveVarcharObjectInspector) 
inspector).getPrimitiveJavaObject(value).getValue();
+        recordConsumer.addBinary(Binary.fromString(vVarchar));
+        break;
+      case BINARY:
+        byte[] vBinary = ((BinaryObjectInspector) 
inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(Binary.fromByteArray(vBinary));
+        break;
+      case TIMESTAMP:
+        Timestamp ts = ((TimestampObjectInspector) 
inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, 
false).toBinary());
+        break;
+      case DECIMAL:
+        HiveDecimal vDecimal = 
((HiveDecimal)inspector.getPrimitiveJavaObject(value));
+        DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
+        recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported primitive data type: " 
+ inspector.getPrimitiveCategory());
+    }
+  }
+
+  private Binary decimalToBinary(final HiveDecimal hiveDecimal, final 
DecimalTypeInfo decimalTypeInfo) {
+    int prec = decimalTypeInfo.precision();
+    int scale = decimalTypeInfo.scale();
+    byte[] decimalBytes = 
hiveDecimal.setScale(scale).unscaledValue().toByteArray();
+
+    // Estimated number of bytes needed.
+    int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
+    if (precToBytes == decimalBytes.length) {
+      // No padding needed.
+      return Binary.fromByteArray(decimalBytes);
     }
+
+    byte[] tgt = new byte[precToBytes];
+      if (hiveDecimal.signum() == -1) {
+      // For negative number, initializing bits to 1
+      for (int i = 0; i < precToBytes; i++) {
+        tgt[i] |= 0xFF;
+      }
+    }
+
+    System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, 
decimalBytes.length); // Padding leading zeroes/ones.
+    return Binary.fromByteArray(tgt);
   }
 }

Modified: 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
 (original)
+++ 
hive/branches/parquet/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
 Thu Feb 12 04:56:42 2015
@@ -20,7 +20,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -29,22 +28,23 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.util.ContextUtil;
 
-public class ParquetRecordWriterWrapper implements RecordWriter<Void, 
ArrayWritable>,
+public class ParquetRecordWriterWrapper implements RecordWriter<Void, 
ParquetHiveRecord>,
   org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
 
   public static final Log LOG = 
LogFactory.getLog(ParquetRecordWriterWrapper.class);
 
-  private final org.apache.hadoop.mapreduce.RecordWriter<Void, ArrayWritable> 
realWriter;
+  private final org.apache.hadoop.mapreduce.RecordWriter<Void, 
ParquetHiveRecord> realWriter;
   private final TaskAttemptContext taskContext;
 
   public ParquetRecordWriterWrapper(
-      final OutputFormat<Void, ArrayWritable> realOutputFormat,
+      final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
       final JobConf jobConf,
       final String name,
       final Progressable progress, Properties tableProperties) throws
@@ -106,7 +106,7 @@ public class ParquetRecordWriterWrapper
   }
 
   @Override
-  public void write(final Void key, final ArrayWritable value) throws 
IOException {
+  public void write(final Void key, final ParquetHiveRecord value) throws 
IOException {
     try {
       realWriter.write(key, value);
     } catch (final InterruptedException e) {
@@ -121,7 +121,7 @@ public class ParquetRecordWriterWrapper
 
   @Override
   public void write(final Writable w) throws IOException {
-    write(null, (ArrayWritable) w);
+    write(null, (ParquetHiveRecord) w);
   }
 
 }

Modified: 
hive/branches/parquet/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
 (original)
+++ 
hive/branches/parquet/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
 Thu Feb 12 04:56:42 2015
@@ -16,7 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hive.ql.io.orc;
+package orc.proto;
+
+option java_package = "org.apache.hadoop.hive.ql.io.orc";
 
 message IntegerStatistics  {
   optional sint64 minimum = 1;
@@ -108,7 +110,7 @@ message Stream {
     ROW_INDEX = 6;
     BLOOM_FILTER = 7;
   }
-  required Kind kind = 1;
+  optional Kind kind = 1;
   optional uint32 column = 2;
   optional uint64 length = 3;
 }
@@ -120,7 +122,7 @@ message ColumnEncoding {
     DIRECT_V2 = 2;
     DICTIONARY_V2 = 3;
   }
-  required Kind kind = 1;
+  optional Kind kind = 1;
   optional uint32 dictionarySize = 2;
 }
 
@@ -150,7 +152,7 @@ message Type {
     VARCHAR = 16;
     CHAR = 17;
   }
-  required Kind kind = 1;
+  optional Kind kind = 1;
   repeated uint32 subtypes = 2 [packed=true];
   repeated string fieldNames = 3;
   optional uint32 maximumLength = 4;
@@ -167,8 +169,8 @@ message StripeInformation {
 }
 
 message UserMetadataItem {
-  required string name = 1;
-  required bytes value = 2;
+  optional string name = 1;
+  optional bytes value = 2;
 }
 
 message StripeStatistics {

Modified: 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
 (original)
+++ 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
 Thu Feb 12 04:56:42 2015
@@ -13,9 +13,27 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
@@ -27,6 +45,10 @@ import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
 import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -62,6 +84,10 @@ public class TestDataWritableWriter {
     inOrder.verify(mockRecordConsumer).addInteger(value);
   }
 
+  private void addLong(int value) {
+    inOrder.verify(mockRecordConsumer).addLong(value);
+  }
+
   private void addFloat(float value) {
     inOrder.verify(mockRecordConsumer).addFloat(value);
   }
@@ -88,6 +114,12 @@ public class TestDataWritableWriter {
 
   private Writable createNull() { return null; }
 
+  private ByteWritable createTinyInt(byte value) { return new 
ByteWritable(value); }
+
+  private ShortWritable createSmallInt(short value) { return new 
ShortWritable(value); }
+
+  private LongWritable createBigInt(long value) { return new 
LongWritable(value); }
+
   private IntWritable createInt(int value) {
     return new IntWritable(value);
   }
@@ -116,20 +148,68 @@ public class TestDataWritableWriter {
     return new ArrayWritable(Writable.class, createGroup(values).get());
   }
 
-  private void writeParquetRecord(String schemaStr, ArrayWritable record) {
-    MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
-    DataWritableWriter hiveParquetWriter = new 
DataWritableWriter(mockRecordConsumer, schema);
+  private List<String> createHiveColumnsFrom(final String columnNamesStr) {
+    List<String> columnNames;
+    if (columnNamesStr.length() == 0) {
+      columnNames = new ArrayList<String>();
+    } else {
+      columnNames = Arrays.asList(columnNamesStr.split(","));
+    }
+
+    return columnNames;
+  }
+
+  private List<TypeInfo> createHiveTypeInfoFrom(final String columnsTypeStr) {
+    List<TypeInfo> columnTypes;
+
+    if (columnsTypeStr.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr);
+    }
+
+    return columnTypes;
+  }
+
+  private ArrayWritableObjectInspector getObjectInspector(final String 
columnNames, final String columnTypes) {
+    List<TypeInfo> columnTypeList = createHiveTypeInfoFrom(columnTypes);
+    List<String> columnNameList = createHiveColumnsFrom(columnNames);
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+
+    return new ArrayWritableObjectInspector(rowTypeInfo);
+  }
+
+  private ParquetHiveRecord getParquetWritable(String columnNames, String 
columnTypes, ArrayWritable record) throws SerDeException {
+    Properties recordProperties = new Properties();
+    recordProperties.setProperty("columns", columnNames);
+    recordProperties.setProperty("columns.types", columnTypes);
+
+    ParquetHiveSerDe serDe = new ParquetHiveSerDe();
+    SerDeUtils.initializeSerDe(serDe, new Configuration(), recordProperties, 
null);
+
+    return new ParquetHiveRecord(serDe.deserialize(record), 
getObjectInspector(columnNames, columnTypes));
+  }
+
+  private void writeParquetRecord(String schema, ParquetHiveRecord record) 
throws SerDeException {
+    MessageType fileSchema = MessageTypeParser.parseMessageType(schema);
+    DataWritableWriter hiveParquetWriter = new 
DataWritableWriter(mockRecordConsumer, fileSchema);
     hiveParquetWriter.write(record);
   }
 
   @Test
   public void testSimpleType() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = 
"int,double,boolean,float,string,tinyint,smallint,bigint";
+    String columnTypes = 
"int,double,boolean,float,string,tinyint,smallint,bigint";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional int32 int;\n"
         + "  optional double double;\n"
         + "  optional boolean boolean;\n"
         + "  optional float float;\n"
-        + "  optional binary string;\n"
+        + "  optional binary string (UTF8);\n"
+        + "  optional int32 tinyint;\n"
+        + "  optional int32 smallint;\n"
+        + "  optional int64 bigint;\n"
         + "}\n";
 
     ArrayWritable hiveRecord = createGroup(
@@ -137,11 +217,14 @@ public class TestDataWritableWriter {
         createDouble(1.0),
         createBoolean(true),
         createFloat(1.0f),
-        createString("one")
+        createString("one"),
+        createTinyInt((byte)1),
+        createSmallInt((short)1),
+        createBigInt((long)1)
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, 
columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
@@ -160,12 +243,24 @@ public class TestDataWritableWriter {
       startField("string", 4);
         addString("one");
       endField("string", 4);
+      startField("tinyint", 5);
+        addInteger(1);
+      endField("tinyint", 5);
+      startField("smallint", 6);
+        addInteger(1);
+      endField("smallint", 6);
+      startField("bigint", 7);
+        addLong(1);
+      endField("bigint", 7);
     endMessage();
   }
 
   @Test
   public void testStructType() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "structCol";
+    String columnTypes = "struct<a:int,b:double,c:boolean>";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional group structCol {\n"
         + "    optional int32 a;\n"
         + "    optional double b;\n"
@@ -182,7 +277,7 @@ public class TestDataWritableWriter {
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, 
columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
@@ -204,9 +299,12 @@ public class TestDataWritableWriter {
 
   @Test
   public void testArrayType() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "arrayCol";
+    String columnTypes = "array<int>";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional group arrayCol (LIST) {\n"
-        + "    repeated group bag {\n"
+        + "    repeated group array {\n"
         + "      optional int32 array_element;\n"
         + "    }\n"
         + "  }\n"
@@ -223,13 +321,13 @@ public class TestDataWritableWriter {
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, 
columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
       startField("arrayCol", 0);
         startGroup();
-          startField("bag", 0);
+          startField("array", 0);
             startGroup();
               startField("array_element", 0);
                 addInteger(1);
@@ -242,7 +340,7 @@ public class TestDataWritableWriter {
               addInteger(2);
             endField("array_element", 0);
             endGroup();
-          endField("bag", 0);
+          endField("array", 0);
         endGroup();
       endField("arrayCol", 0);
     endMessage();
@@ -250,7 +348,10 @@ public class TestDataWritableWriter {
 
   @Test
   public void testMapType() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "mapCol";
+    String columnTypes = "map<string,int>";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional group mapCol (MAP) {\n"
         + "    repeated group map (MAP_KEY_VALUE) {\n"
         + "      required binary key;\n"
@@ -279,7 +380,7 @@ public class TestDataWritableWriter {
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, 
columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
@@ -315,12 +416,15 @@ public class TestDataWritableWriter {
 
   @Test
   public void testArrayOfArrays() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "array_of_arrays";
+    String columnTypes = "array<array<int>>";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional group array_of_arrays (LIST) {\n"
         + "    repeated group array {\n"
-        + "      required group element (LIST) {\n"
+        + "      optional group array_element (LIST) {\n"
         + "        repeated group array {\n"
-        + "          required int32 element;\n"
+        + "          optional int32 array_element;\n"
         + "        }\n"
         + "      }\n"
         + "    }\n"
@@ -341,7 +445,7 @@ public class TestDataWritableWriter {
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, 
columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
@@ -349,22 +453,22 @@ public class TestDataWritableWriter {
         startGroup();
           startField("array", 0);
             startGroup();
-              startField("element", 0);
+              startField("array_element", 0);
                 startGroup();
                   startField("array", 0);
                     startGroup();
-                      startField("element", 0);
+                      startField("array_element", 0);
                         addInteger(1);
-                      endField("element", 0);
+                      endField("array_element", 0);
                     endGroup();
                     startGroup();
-                      startField("element", 0);
+                      startField("array_element", 0);
                         addInteger(2);
-                      endField("element", 0);
+                      endField("array_element", 0);
                     endGroup();
                   endField("array", 0);
                 endGroup();
-              endField("element", 0);
+              endField("array_element", 0);
             endGroup();
           endField("array", 0);
         endGroup();
@@ -373,124 +477,63 @@ public class TestDataWritableWriter {
   }
 
   @Test
-  public void testGroupFieldIsNotArrayWritable() throws Exception {
-    String schemaStr = "message hive_schema {\n"
-        + "  optional group a {\n"
-        + "    optional int32 b;\n"
-        + "  }\n"
-        + "}\n";
+  public void testExpectedStructTypeOnRecord() throws Exception {
+    String columnNames = "structCol";
+    String columnTypes = "int";
 
     ArrayWritable hiveRecord = createGroup(
-          createInt(1)
+        createInt(1)
     );
 
+    String fileSchema = "message hive_schema {\n"
+        + "  optional group structCol {\n"
+      + "      optional int32 int;\n"
+      + "    }\n"
+        + "}\n";
+
     try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
+      writeParquetRecord(fileSchema, getParquetWritable(columnNames, 
columnTypes, hiveRecord));
       fail();
     } catch (RuntimeException e) {
-      assertEquals("Parquet record is malformed: Field value is not an 
ArrayWritable object: " +
-          "optional group a {\n  optional int32 b;\n}", e.getMessage());
+      assertEquals("Parquet record is malformed: Invalid data type: expected 
STRUCT type, but found: PRIMITIVE", e.getMessage());
     }
   }
 
   @Test
-  public void testArrayGroupElementIsNotArrayWritable() throws Exception {
-    String schemaStr = "message hive_schema {\n"
-        + "  optional group array_of_arrays (LIST) {\n"
-        + "    repeated group array {\n"
-        + "      required group element (LIST) {\n"
-        + "        required int32 element;\n"
-        + "      }\n"
-        + "    }\n"
-        + "  }\n"
-        + "}\n";
+  public void testExpectedArrayTypeOnRecord() throws Exception {
+    String columnNames = "arrayCol";
+    String columnTypes = "int";
 
     ArrayWritable hiveRecord = createGroup(
-        createGroup(
-            createArray(
-                createInt(1)
-            )
-        )
+        createInt(1)
     );
 
-    try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
-      fail();
-    } catch (RuntimeException e) {
-      assertEquals("Parquet record is malformed: Field value is not an 
ArrayWritable object: " +
-          "required group element (LIST) {\n  required int32 element;\n}", 
e.getMessage());
-    }
-  }
-
-  @Test
-  public void testMapElementIsNotArrayWritable() throws Exception {
-    String schemaStr = "message hive_schema {\n"
-        + "  optional group mapCol (MAP) {\n"
-        + "    repeated group map (MAP_KEY_VALUE) {\n"
-        + "      required binary key;\n"
-        + "      optional group value {\n"
-        + "        required int32 value;"
-        + "      }\n"
+    String fileSchema = "message hive_schema {\n"
+        + "  optional group arrayCol (LIST) {\n"
+        + "    repeated group bag {\n"
+        + "      optional int32 array_element;\n"
         + "    }\n"
         + "  }\n"
         + "}\n";
 
-    ArrayWritable hiveRecord = createGroup(
-        createGroup(
-            createArray(
-                createGroup(
-                    createString("key1"),
-                    createInt(1)
-                )
-            )
-        )
-    );
-
     try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
+      writeParquetRecord(fileSchema, getParquetWritable(columnNames, 
columnTypes, hiveRecord));
       fail();
     } catch (RuntimeException e) {
-      assertEquals(
-          "Parquet record is malformed: Field value is not an ArrayWritable 
object: " +
-              "optional group value {\n  required int32 value;\n}", 
e.getMessage());
+      assertEquals("Parquet record is malformed: Invalid data type: expected 
LIST type, but found: PRIMITIVE", e.getMessage());
     }
   }
 
   @Test
-  public void testMapKeyValueIsNotArrayWritable() throws Exception {
-    String schemaStr = "message hive_schema {\n"
-        + "  optional group mapCol (MAP) {\n"
-        + "    repeated group map (MAP_KEY_VALUE) {\n"
-        + "      required binary key;\n"
-        + "      optional int32 value;\n"
-        + "    }\n"
-        + "  }\n"
-        + "}\n";
+  public void testExpectedMapTypeOnRecord() throws Exception {
+    String columnNames = "mapCol";
+    String columnTypes = "int";
 
     ArrayWritable hiveRecord = createGroup(
-        createGroup(
-            createArray(
-                createString("key1"),
-                createInt(1)
-            )
-        )
+        createInt(1)
     );
 
-    try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
-      fail();
-    } catch (RuntimeException e) {
-      assertEquals("Parquet record is malformed: Map key-value pair is not an 
ArrayWritable object on record 0", e.getMessage());
-    }
-  }
-
-  @Test
-  public void testMapKeyValueIsNull() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String fileSchema = "message hive_schema {\n"
         + "  optional group mapCol (MAP) {\n"
         + "    repeated group map (MAP_KEY_VALUE) {\n"
         + "      required binary key;\n"
@@ -499,20 +542,11 @@ public class TestDataWritableWriter {
         + "  }\n"
         + "}\n";
 
-    ArrayWritable hiveRecord = createGroup(
-        createGroup(
-            createArray(
-                createNull()
-            )
-        )
-    );
-
     try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
+      writeParquetRecord(fileSchema, getParquetWritable(columnNames, 
columnTypes, hiveRecord));
       fail();
     } catch (RuntimeException e) {
-      assertEquals("Parquet record is malformed: Map key-value pair is null on 
record 0", e.getMessage());
+      assertEquals("Parquet record is malformed: Invalid data type: expected 
MAP type, but found: PRIMITIVE", e.getMessage());
     }
   }
 }

Modified: 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
 (original)
+++ 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
 Thu Feb 12 04:56:42 2015
@@ -24,7 +24,7 @@ import java.util.Properties;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
 import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.Progressable;
 import org.junit.Test;
@@ -41,7 +41,7 @@ public class TestMapredParquetOutputForm
   @SuppressWarnings("unchecked")
   @Test
   public void testConstructorWithFormat() {
-    new MapredParquetOutputFormat((ParquetOutputFormat<ArrayWritable>) 
mock(ParquetOutputFormat.class));
+    new MapredParquetOutputFormat((ParquetOutputFormat<ParquetHiveRecord>) 
mock(ParquetOutputFormat.class));
   }
 
   @Test
@@ -62,7 +62,7 @@ public class TestMapredParquetOutputForm
     tableProps.setProperty("columns.types", "int:int");
 
     final Progressable mockProgress = mock(Progressable.class);
-    final ParquetOutputFormat<ArrayWritable> outputFormat = 
(ParquetOutputFormat<ArrayWritable>) mock(ParquetOutputFormat.class);
+    final ParquetOutputFormat<ParquetHiveRecord> outputFormat = 
(ParquetOutputFormat<ParquetHiveRecord>) mock(ParquetOutputFormat.class);
 
     JobConf jobConf = new JobConf();
 
@@ -70,7 +70,7 @@ public class TestMapredParquetOutputForm
       new MapredParquetOutputFormat(outputFormat) {
         @Override
         protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
-            ParquetOutputFormat<ArrayWritable> realOutputFormat,
+            ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
             JobConf jobConf,
             String finalOutPath,
             Progressable progress,

Modified: 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java
 (original)
+++ 
hive/branches/parquet/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java
 Thu Feb 12 04:56:42 2015
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.ArrayWritable;
@@ -96,9 +97,9 @@ public class TestParquetSerDe extends Te
     assertEquals("deserialization gives the wrong object", t, row);
 
     // Serialize
-    final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, 
oi);
-    assertEquals("size correct after serialization", 
serDe.getSerDeStats().getRawDataSize(), serializedArr.get().length);
-    assertTrue("serialized object should be equal to starting object", 
arrayWritableEquals(t, serializedArr));
+    final ParquetHiveRecord serializedArr = (ParquetHiveRecord) 
serDe.serialize(row, oi);
+    assertEquals("size correct after serialization", 
serDe.getSerDeStats().getRawDataSize(), 
((ArrayWritable)serializedArr.getObject()).get().length);
+    assertTrue("serialized object should be equal to starting object", 
arrayWritableEquals(t, (ArrayWritable)serializedArr.getObject()));
   }
 
   private Properties createProperties() {

Modified: 
hive/branches/parquet/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
 (original)
+++ 
hive/branches/parquet/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
 Thu Feb 12 04:56:42 2015
@@ -20,7 +20,7 @@ package org.apache.hive.service.cli.thri
 
 import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -63,9 +63,10 @@ public class ThriftHttpCLIService extend
       httpServer = new org.eclipse.jetty.server.Server();
 
       // Server thread pool
+      // Start with minWorkerThreads, expand till maxWorkerThreads and reject 
subsequent requests
       String threadPoolName = "HiveServer2-HttpHandler-Pool";
       ExecutorService executorService = new 
ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
-          workerKeepAliveTime, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(),
+          workerKeepAliveTime, TimeUnit.SECONDS, new 
SynchronousQueue<Runnable>(),
           new ThreadFactoryWithGarbageCleanup(threadPoolName));
       ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
       httpServer.setThreadPool(threadPool);

Modified: 
hive/branches/parquet/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: 
http://svn.apache.org/viewvc/hive/branches/parquet/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1659148&r1=1659147&r2=1659148&view=diff
==============================================================================
--- 
hive/branches/parquet/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
 (original)
+++ 
hive/branches/parquet/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
 Thu Feb 12 04:56:42 2015
@@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUti
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider.Options;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.DefaultFileAccess;
@@ -495,8 +496,10 @@ public class Hadoop23Shims extends Hadoo
 
     // Need to set the client's KeyProvider to the NN's for JKS,
     // else the updates do not get flushed properly
-    miniDFSCluster.getFileSystem().getClient().setKeyProvider(
-        miniDFSCluster.getNameNode().getNamesystem().getProvider());
+    KeyProviderCryptoExtension keyProvider =  
miniDFSCluster.getNameNode().getNamesystem().getProvider();
+    if (keyProvider != null) {
+      miniDFSCluster.getFileSystem().getClient().setKeyProvider(keyProvider);
+    }
 
     cluster = new MiniDFSShim(miniDFSCluster);
     return cluster;


Reply via email to