DRILL-1252: Implement Complex parquet and json writers

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/49d238ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/49d238ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/49d238ee

Branch: refs/heads/master
Commit: 49d238eea601a6764129b9d64cf9bca440209e12
Parents: 450d891
Author: Steven Phillips <sphill...@maprtech.com>
Authored: Thu Jul 31 23:46:26 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Wed Aug 6 16:44:23 2014 -0700

----------------------------------------------------------------------
 .../codegen/templates/AbstractRecordWriter.java |  21 +-
 .../templates/EventBasedRecordWriter.java       | 128 ++++++------
 .../templates/JsonOutputRecordWriter.java       | 171 ++++++++++++++++
 .../templates/ParquetOutputRecordWriter.java    | 119 ++++++-----
 .../codegen/templates/RecordValueAccessor.java  |   6 +
 .../main/codegen/templates/RecordWriter.java    |   9 +-
 .../templates/StringOutputRecordWriter.java     |  73 ++++---
 .../exec/physical/impl/WriterRecordBatch.java   |   5 +-
 .../drill/exec/record/MaterializedField.java    |   4 +
 .../exec/store/easy/json/JSONFormatPlugin.java  |  24 ++-
 .../exec/store/easy/json/JsonRecordWriter.java  | 205 +++++++++++++++++++
 .../exec/store/parquet/ParquetRecordWriter.java | 112 +++++++++-
 .../exec/store/text/DrillTextRecordWriter.java  |  30 +++
 .../exec/vector/complex/RepeatedMapVector.java  |   4 +
 .../complex/impl/RepeatedMapReaderImpl.java     |   5 +-
 .../physical/impl/writer/TestParquetWriter.java |   9 +-
 exec/java-exec/src/test/resources/donuts.json   | 132 ++++++++++++
 .../apache/drill/common/types/MinorType.java    |   4 +-
 18 files changed, 905 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
index 6a7fb86..6b6065f 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
@@ -23,18 +23,35 @@
 package org.apache.drill.exec.store;
 
 import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 import java.io.IOException;
 import java.lang.UnsupportedOperationException;
 
 public abstract class AbstractRecordWriter implements RecordWriter {
 
+  @Override
+  public FieldConverter getNewMapConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    throw new UnsupportedOperationException("Doesn't support writing Map'");
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedMapConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    throw new UnsupportedOperationException("Doesn't support writing 
RepeatedMap");
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedListConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    throw new UnsupportedOperationException("Doesn't support writing 
RepeatedList");
+  }
+
 <#list vv.types as type>
   <#list type.minor as minor>
     <#list vv.modes as mode>
   @Override
-  public void add${mode.prefix}${minor.class}Holder(int fieldId, 
${mode.prefix}${minor.class}Holder valueHolder) throws IOException {
-    throw new UnsupportedOperationException("Doesn't support writing 
'${mode.prefix}${minor.class}Holder'");
+  public FieldConverter getNew${mode.prefix}${minor.class}Converter(int 
fieldId, String fieldName, FieldReader reader) {
+    throw new UnsupportedOperationException("Doesn't support writing 
'${mode.prefix}${minor.class}'");
   }
     </#list>
   </#list>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
index b58f24c..e76178a 100644
--- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
@@ -25,11 +25,9 @@ package org.apache.drill.exec.store;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.expr.holders.*;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordValueAccessor;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 import java.io.IOException;
 import java.util.List;
@@ -40,61 +38,44 @@ import java.util.Map;
 public class EventBasedRecordWriter {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(EventBasedRecordWriter.class);
 
-  private BatchSchema schema;
-  private RecordValueAccessor rva;
+  private VectorAccessible batch;
   private RecordWriter recordWriter;
-  private List<FieldWriter> fieldWriters;
+  private List<FieldConverter> fieldConverters;
 
-  static private Map<MajorType, Class<? extends FieldWriter>> typeClassMap;
-
-  static {
-    typeClassMap = Maps.newHashMap();
-
-<#list vv.types as type>
-  <#list type.minor as minor>
-    <#list vv.modes as mode>
-    typeClassMap.put(${mode.prefix}${minor.class}Holder.TYPE, 
${mode.prefix}${minor.class}FieldWriter.class);
-    </#list>
-  </#list>
-</#list>
-  }
-
-  public EventBasedRecordWriter(BatchSchema schema, RecordValueAccessor rva, 
RecordWriter recordWriter)
-      throws IOException {
-    this.schema = schema;
-    this.rva = rva;
+  public EventBasedRecordWriter(VectorAccessible batch, RecordWriter 
recordWriter)
+          throws IOException {
+    this.batch = batch;
     this.recordWriter = recordWriter;
 
     initFieldWriters();
   }
 
-  public int write() throws IOException {
+  public int write(int recordCount) throws IOException {
     int counter = 0;
 
-    rva.resetIterator();
-    while(rva.next()) {
+    for (; counter < recordCount; counter++) {
       recordWriter.startRecord();
       // write the current record
-      int fieldId = 0;
-      for (MaterializedField field : schema) {
-        fieldWriters.get(fieldId).writeField();
-        fieldId++;
+      for (FieldConverter converter : fieldConverters) {
+        converter.setPosition(counter);
+        converter.startField();
+        converter.writeField();
+        converter.endField();
       }
       recordWriter.endRecord();
-      counter++;
     }
 
     return counter;
   }
 
   private void initFieldWriters() throws IOException {
-    fieldWriters = Lists.newArrayList();
+    fieldConverters = Lists.newArrayList();
     try {
-      for (int i = 0; i < schema.getFieldCount(); i++) {
-        MajorType mt = schema.getColumn(i).getType();
-        MajorType newMt = 
MajorType.newBuilder().setMinorType(mt.getMinorType()).setMode(mt.getMode()).build();
-        fieldWriters.add(i, typeClassMap.get(newMt)
-                .getConstructor(EventBasedRecordWriter.class, 
int.class).newInstance(this, i));
+      int fieldId = 0;
+      for (VectorWrapper w : batch) {
+        FieldReader reader = w.getValueVector().getAccessor().getReader();
+        FieldConverter converter = getConverter(recordWriter, fieldId++, 
w.getField().getLastName(), reader);
+        fieldConverters.add(converter);
       }
     } catch(Exception e) {
       logger.error("Failed to create FieldWriter.", e);
@@ -102,33 +83,64 @@ public class EventBasedRecordWriter {
     }
   }
 
-  abstract class FieldWriter {
+  public static abstract class FieldConverter {
     protected int fieldId;
+    protected String fieldName;
+    protected FieldReader reader;
 
-    public FieldWriter(int fieldId) {
+    public FieldConverter(int fieldId, String fieldName, FieldReader reader) {
       this.fieldId = fieldId;
+      this.fieldName = fieldName;
+      this.reader = reader;
     }
 
-    public abstract void writeField() throws IOException;
-  }
-
-<#list vv.types as type>
-  <#list type.minor as minor>
-    <#list vv.modes as mode>
-  class ${mode.prefix}${minor.class}FieldWriter extends FieldWriter {
-    private ${mode.prefix}${minor.class}Holder holder = new 
${mode.prefix}${minor.class}Holder();
+    public void setPosition(int index) {
+      reader.setPosition(index);
+    }
 
-    public ${mode.prefix}${minor.class}FieldWriter(int fieldId) {
-      super(fieldId);
+    public void startField() throws IOException {
+      // no op
     }
 
-    public void writeField() throws IOException {
-      rva.getFieldById(fieldId, holder);
-      recordWriter.add${mode.prefix}${minor.class}Holder(fieldId, holder);
+    public void endField() throws IOException {
+      // no op
     }
+
+    public abstract void writeField() throws IOException;
   }
 
-    </#list>
-  </#list>
-</#list>
+  public static FieldConverter getConverter(RecordWriter recordWriter, int 
fieldId, String fieldName, FieldReader reader) {
+    switch (reader.getType().getMinorType()) {
+      case MAP:
+        switch (reader.getType().getMode()) {
+          case REQUIRED:
+          case OPTIONAL:
+            return recordWriter.getNewMapConverter(fieldId, fieldName, reader);
+          case REPEATED:
+            return recordWriter.getNewRepeatedMapConverter(fieldId, fieldName, 
reader);
+        }
+
+      case LIST:
+        switch (reader.getType().getMode()) {
+          case REPEATED:
+            return recordWriter.getNewRepeatedListConverter(fieldId, 
fieldName, reader);
+        }
+
+        <#list vv.types as type>
+        <#list type.minor as minor>
+      case ${minor.class?upper_case}:
+      switch (reader.getType().getMode()) {
+        case REQUIRED:
+          return recordWriter.getNew${minor.class}Converter(fieldId, 
fieldName, reader);
+        case OPTIONAL:
+          return recordWriter.getNewNullable${minor.class}Converter(fieldId, 
fieldName, reader);
+        case REPEATED:
+          return recordWriter.getNewRepeated${minor.class}Converter(fieldId, 
fieldName, reader);
+      }
+      </#list>
+      </#list>
+
+    }
+    throw new UnsupportedOperationException();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java
new file mode 100644
index 0000000..d1a6d4e
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.joda.time.DateTimeUtils;
+import parquet.io.api.Binary;
+
+import java.lang.Override;
+import java.lang.RuntimeException;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile 
name="org/apache/drill/exec/store/JSONOutputRecordWriter.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.store;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.parquet.ParquetTypeHelper;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+import parquet.io.api.Binary;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+
+
+import org.apache.drill.common.types.TypeProtos;
+
+import org.joda.time.DateTimeUtils;
+
+import java.io.IOException;
+import java.lang.UnsupportedOperationException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract implementation of RecordWriter interface which exposes interface:
+ *    {@link #writeHeader(List)}
+ *    {@link #addField(int,String)}
+ * to output the data in string format instead of implementing addField for 
each type holder.
+ *
+ * This is useful for text format writers such as CSV, TSV etc.
+ */
+public abstract class JSONOutputRecordWriter extends AbstractRecordWriter 
implements RecordWriter {
+
+  protected JsonGenerator gen;
+
+<#list vv.types as type>
+  <#list type.minor as minor>
+    <#list vv.modes as mode>
+  <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) 
/>
+  @Override
+  public FieldConverter getNew${mode.prefix}${minor.class}Converter(int 
fieldId, String fieldName, FieldReader reader) {
+    return new ${mode.prefix}${minor.class}JsonConverter(fieldId, fieldName, 
reader);
+  }
+
+  public class ${mode.prefix}${minor.class}JsonConverter extends 
FieldConverter {
+
+    public ${mode.prefix}${minor.class}JsonConverter(int fieldId, String 
fieldName, FieldReader reader) {
+      super(fieldId, fieldName, reader);
+    }
+
+    @Override
+    public void startField() throws IOException {
+      gen.writeFieldName(fieldName);
+    }
+
+    @Override
+    public void writeField() throws IOException {
+  <#if mode.prefix == "Nullable" >
+    if (!reader.isSet()) {
+      gen.writeNull();
+      return;
+    }
+  <#elseif mode.prefix == "Repeated" >
+    // empty lists are represented by simply not starting a field, rather than 
starting one and putting in 0 elements
+    if (reader.size() == 0) {
+      return;
+    }
+    gen.writeStartArray();
+    for (int i = 0; i < reader.size(); i++) {
+  <#else>
+  </#if>
+
+  <#if  minor.class == "TinyInt" ||
+        minor.class == "UInt1" ||
+        minor.class == "UInt2" ||
+        minor.class == "SmallInt" ||
+        minor.class == "Int" ||
+        minor.class == "Decimal9" ||
+        minor.class == "Float4" ||
+        minor.class == "BigInt" ||
+        minor.class == "Decimal18" ||
+        minor.class == "UInt8" ||
+        minor.class == "UInt4" ||
+        minor.class == "Float8" ||
+        minor.class == "Decimal28Sparse" ||
+        minor.class == "Decimal28Dense" ||
+        minor.class == "Decimal38Dense" ||
+        minor.class == "Decimal38Sparse">
+    <#if mode.prefix == "Repeated" >
+      gen.writeNumber(reader.read${friendlyType}(i));
+    <#else>
+      gen.writeNumber(reader.read${friendlyType}());
+    </#if>
+  <#elseif minor.class == "Date" ||
+              minor.class == "Time" ||
+              minor.class == "TimeStamp" ||
+              minor.class == "TimeTZ" ||
+              minor.class == "TimeStampTZ" ||
+              minor.class == "IntervalDay" ||
+              minor.class == "Interval" ||
+              minor.class == "VarChar" ||
+              minor.class == "Var16Char" ||
+              minor.class == "IntervalYear">
+    <#if mode.prefix == "Repeated" >
+              gen.writeString(reader.read${friendlyType}(i).toString());
+    <#else>
+      gen.writeString(reader.read${friendlyType}().toString());
+    </#if>
+  <#elseif
+        minor.class == "Bit">
+      <#if mode.prefix == "Repeated" >
+              gen.writeBoolean(reader.read${friendlyType}(i));
+      <#else>
+      gen.writeBoolean(reader.read${friendlyType}());
+      </#if>
+  <#elseif
+            minor.class == "VarBinary">
+      <#if mode.prefix == "Repeated" >
+              gen.writeBinary(reader.readByteArray(i));
+      <#else>
+      gen.writeBinary(reader.readByteArray());
+      </#if>
+  </#if>
+  <#if mode.prefix == "Repeated">
+    }
+      gen.writeEndArray();
+  </#if>
+    }
+  }
+    </#list>
+  </#list>
+</#list>
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 5284199..aa25d1a 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -33,9 +33,11 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.parquet.ParquetTypeHelper;
 import org.apache.drill.exec.vector.*;
 import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import parquet.io.api.RecordConsumer;
 import parquet.schema.MessageType;
 import parquet.io.api.Binary;
@@ -63,7 +65,7 @@ import java.util.Map;
  *
  * This is useful for text format writers such as CSV, TSV etc.
  */
-public abstract class ParquetOutputRecordWriter implements RecordWriter {
+public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter 
implements RecordWriter {
 
   private RecordConsumer consumer;
   private MessageType schema;
@@ -78,17 +80,30 @@ public abstract class ParquetOutputRecordWriter implements 
RecordWriter {
   <#list type.minor as minor>
     <#list vv.modes as mode>
   @Override
-  public void add${mode.prefix}${minor.class}Holder(int fieldId, 
${mode.prefix}${minor.class}Holder valueHolder) throws IOException {
+  public FieldConverter getNew${mode.prefix}${minor.class}Converter(int 
fieldId, String fieldName, FieldReader reader) {
+    return new ${mode.prefix}${minor.class}ParquetConverter(fieldId, 
fieldName, reader);
+  }
+
+  public class ${mode.prefix}${minor.class}ParquetConverter extends 
FieldConverter {
+    private Nullable${minor.class}Holder holder = new 
Nullable${minor.class}Holder();
+
+    public ${mode.prefix}${minor.class}ParquetConverter(int fieldId, String 
fieldName, FieldReader reader) {
+      super(fieldId, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() throws IOException {
   <#if mode.prefix == "Nullable" >
-    if (valueHolder.isSet == 0) {
+    if (!reader.isSet()) {
       return;
     }
   <#elseif mode.prefix == "Repeated" >
     // empty lists are represented by simply not starting a field, rather than 
starting one and putting in 0 elements
-    if (valueHolder.start == valueHolder.end)
+    if (reader.size() == 0) {
       return;
-    consumer.startField(schema.getFieldName(fieldId), fieldId);
-    for (int i = valueHolder.start; i < valueHolder.end; i++) {
+    }
+    consumer.startField(fieldName, fieldId);
+    for (int i = 0; i < reader.size(); i++) {
   </#if>
 
   <#if  minor.class == "TinyInt" ||
@@ -101,20 +116,24 @@ public abstract class ParquetOutputRecordWriter 
implements RecordWriter {
         minor.class == "Decimal9" ||
         minor.class == "UInt4">
     <#if mode.prefix == "Repeated" >
-            consumer.addInteger(valueHolder.vector.getAccessor().get(i));
+            reader.read(i, holder);
+            consumer.addInteger(holder.value);
     <#else>
-    consumer.startField(schema.getFieldName(fieldId), fieldId);
-    consumer.addInteger(valueHolder.value);
-    consumer.endField(schema.getFieldName(fieldId), fieldId);
+    consumer.startField(fieldName, fieldId);
+    reader.read(holder);
+    consumer.addInteger(holder.value);
+    consumer.endField(fieldName, fieldId);
     </#if>
   <#elseif
         minor.class == "Float4">
       <#if mode.prefix == "Repeated" >
-              consumer.addFloat(valueHolder.vector.getAccessor().get(i));
+              reader.read(i, holder);
+              consumer.addFloat(holder.value);
       <#else>
-    consumer.startField(schema.getFieldName(fieldId), fieldId);
-    consumer.addFloat(valueHolder.value);
-    consumer.endField(schema.getFieldName(fieldId), fieldId);
+    consumer.startField(fieldName, fieldId);
+    reader.read(holder);
+    consumer.addFloat(holder.value);
+    consumer.endField(fieldName, fieldId);
       </#if>
   <#elseif
         minor.class == "BigInt" ||
@@ -122,59 +141,64 @@ public abstract class ParquetOutputRecordWriter 
implements RecordWriter {
         minor.class == "TimeStamp" ||
         minor.class == "UInt8">
       <#if mode.prefix == "Repeated" >
-              consumer.addLong(valueHolder.vector.getAccessor().get(i));
+              reader.read(i, holder);
+              consumer.addLong(holder.value);
       <#else>
-    consumer.startField(schema.getFieldName(fieldId), fieldId);
-    consumer.addLong(valueHolder.value);
-    consumer.endField(schema.getFieldName(fieldId), fieldId);
+    consumer.startField(fieldName, fieldId);
+    reader.read(holder);
+    consumer.addLong(holder.value);
+    consumer.endField(fieldName, fieldId);
       </#if>
   <#elseif minor.class == "Date">
     <#if mode.prefix == "Repeated" >
-      consumer.addInteger((int) 
(DateTimeUtils.toJulianDayNumber(valueHolder.vector.getAccessor().get(i)) + 
JULIAN_DAY_EPOC));
+      reader.read(i, holder);
+      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) 
+ JULIAN_DAY_EPOC));
     <#else>
-      consumer.startField(schema.getFieldName(fieldId), fieldId);
+      consumer.startField(fieldName, fieldId);
+      reader.read(holder);
       // convert from internal Drill date format to Julian Day centered around 
Unix Epoc
-      consumer.addInteger((int) 
(DateTimeUtils.toJulianDayNumber(valueHolder.value) + JULIAN_DAY_EPOC));
-      consumer.endField(schema.getFieldName(fieldId), fieldId);
+      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) 
+ JULIAN_DAY_EPOC));
+      consumer.endField(fieldName, fieldId);
     </#if>
   <#elseif
         minor.class == "Float8">
       <#if mode.prefix == "Repeated" >
-              consumer.addDouble(valueHolder.vector.getAccessor().get(i));
+              reader.read(i, holder);
+              consumer.addDouble(holder.value);
       <#else>
-    consumer.startField(schema.getFieldName(fieldId), fieldId);
-    consumer.addDouble(valueHolder.value);
-    consumer.endField(schema.getFieldName(fieldId), fieldId);
+    consumer.startField(fieldName, fieldId);
+    reader.read(holder);
+    consumer.addDouble(holder.value);
+    consumer.endField(fieldName, fieldId);
       </#if>
   <#elseif
         minor.class == "Bit">
       <#if mode.prefix == "Repeated" >
-              consumer.addBoolean(valueHolder.vector.getAccessor().get(i) == 
1);
+              reader.read(i, holder);
+              consumer.addBoolean(holder.value == 1);
       <#else>
-    consumer.startField(schema.getFieldName(fieldId), fieldId);
-    consumer.addBoolean(valueHolder.value == 1);
-    consumer.endField(schema.getFieldName(fieldId), fieldId);
+    consumer.startField(fieldName, fieldId);
+    consumer.addBoolean(holder.value == 1);
+    consumer.endField(fieldName, fieldId);
       </#if>
   <#elseif
         minor.class == "Decimal28Sparse" ||
         minor.class == "Decimal38Sparse">
       <#if mode.prefix == "Repeated" >
       <#else>
-      consumer.startField(schema.getFieldName(fieldId), fieldId);
-      ${minor.class}Vector tempVec = new 
${minor.class}Vector(MaterializedField.create("", 
TypeProtos.MajorType.getDefaultInstance()), new TopLevelAllocator());
-      tempVec.allocateNew(10);
-      tempVec.getMutator().setSafe(0, valueHolder);
+      consumer.startField(fieldName, fieldId);
+      reader.read(holder);
       byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
-              valueHolder.buffer, valueHolder.start, 
${minor.class}Holder.nDecimalDigits, 
valueHolder.scale).unscaledValue().toByteArray();
+              holder.buffer, holder.start, 
${minor.class}Holder.nDecimalDigits, 
holder.scale).unscaledValue().toByteArray();
       byte[] output = new 
byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
-      if (valueHolder.getSign()) {
+      if (holder.getSign()) {
         Arrays.fill(output, 0, output.length - bytes.length, (byte)0xFF);
       } else {
         Arrays.fill(output, 0, output.length - bytes.length, (byte)0x0);
       }
       System.arraycopy(bytes, 0, output, output.length - bytes.length, 
bytes.length);
       consumer.addBinary(Binary.fromByteArray(output));
-      consumer.endField(schema.getFieldName(fieldId), fieldId);
+      consumer.endField(fieldName, fieldId);
       </#if>
   <#elseif
         minor.class == "TimeTZ" ||
@@ -190,22 +214,23 @@ public abstract class ParquetOutputRecordWriter 
implements RecordWriter {
       </#if>
   <#elseif minor.class == "VarChar" || minor.class == "Var16Char" || 
minor.class == "VarBinary">
     <#if mode.prefix == "Repeated">
-      ${minor.class}Holder singleHolder = new ${minor.class}Holder();
-      valueHolder.vector.getAccessor().get(i, singleHolder);
-      consumer.startField(schema.getFieldName(fieldId), fieldId);
-      
consumer.addBinary(Binary.fromByteBuffer(singleHolder.buffer.nioBuffer(singleHolder.start,
 singleHolder.end - singleHolder.start)));
-      consumer.endField(schema.getFieldName(fieldId), fieldId);
+      reader.read(i, holder);
+      consumer.startField(fieldName, fieldId);
+      
consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, 
holder.end - holder.start)));
+      consumer.endField(fieldName, fieldId);
     <#else>
-    ByteBuf buf = valueHolder.buffer;
-    consumer.startField(schema.getFieldName(fieldId), fieldId);
-    
consumer.addBinary(Binary.fromByteBuffer(valueHolder.buffer.nioBuffer(valueHolder.start,
 valueHolder.end - valueHolder.start)));
-    consumer.endField(schema.getFieldName(fieldId), fieldId);
+    reader.read(holder);
+    ByteBuf buf = holder.buffer;
+    consumer.startField(fieldName, fieldId);
+    
consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, 
holder.end - holder.start)));
+    consumer.endField(fieldName, fieldId);
     </#if>
   </#if>
   <#if mode.prefix == "Repeated">
     }
-    consumer.endField(schema.getFieldName(fieldId), fieldId);
+    consumer.endField(fieldName, fieldId);
   </#if>
+    }
   }
     </#list>
   </#list>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java 
b/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java
index d4c6817..4719731 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java
@@ -62,6 +62,12 @@ public class RecordValueAccessor {
     return ++currentIndex < batch.getRecordCount();
   }
 
+  public void getFieldById(int fieldId, ComplexHolder holder) {
+    holder.isSet = vectors[fieldId].getAccessor().isNull(currentIndex) ? 1 : 0;
+    holder.reader = (vectors[fieldId]).getAccessor().getReader();
+    holder.reader.setPosition(currentIndex);
+  }
+
 <#list vv.types as type>
   <#list type.minor as minor>
     <#list vv.modes as mode>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/codegen/templates/RecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
index 2334a14..c6325fd 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
@@ -23,6 +23,8 @@ package org.apache.drill.exec.store;
 
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 import java.io.IOException;
 import java.lang.UnsupportedOperationException;
@@ -52,11 +54,16 @@ public interface RecordWriter {
    */
   void startRecord() throws IOException;
 
+  /** Add the field value given in <code>valueHolder</code> at the given 
column number <code>fieldId</code>. */
+  public FieldConverter getNewMapConverter(int fieldId, String fieldName, 
FieldReader reader);
+  public FieldConverter getNewRepeatedMapConverter(int fieldId, String 
fieldName, FieldReader reader);
+  public FieldConverter getNewRepeatedListConverter(int fieldId, String 
fieldName, FieldReader reader);
+
 <#list vv.types as type>
   <#list type.minor as minor>
     <#list vv.modes as mode>
   /** Add the field value given in <code>valueHolder</code> at the given 
column number <code>fieldId</code>. */
-  void add${mode.prefix}${minor.class}Holder(int fieldId, 
${mode.prefix}${minor.class}Holder valueHolder) throws IOException;
+  public FieldConverter getNew${mode.prefix}${minor.class}Converter(int 
fieldId, String fieldName, FieldReader reader);
 
     </#list>
   </#list>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index 9f0d701..070ed7b 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -16,6 +16,9 @@
  * limitations under the License.
  */
 
+import java.lang.Override;
+import java.lang.UnsupportedOperationException;
+
 <@pp.dropOutputFile />
 <@pp.changeOutputFile 
name="org/apache/drill/exec/store/StringOutputRecordWriter.java" />
 <#include "/@includes/license.ftl" />
@@ -28,7 +31,9 @@ import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 import java.io.IOException;
 import java.lang.UnsupportedOperationException;
@@ -45,45 +50,65 @@ import java.util.Map;
  */
 public abstract class StringOutputRecordWriter implements RecordWriter {
 
-  private ValueVector[] columnVectors;
   private final BufferAllocator allocator;
   protected StringOutputRecordWriter(BufferAllocator allocator){
     this.allocator = allocator;
   }
   
   public void updateSchema(BatchSchema schema) throws IOException {
-    cleanupColumnVectors();
-    columnVectors = new ValueVector[schema.getFieldCount()];
-
     List<String> columnNames = Lists.newArrayList();
-    for (int i=0; i<columnVectors.length; i++) {
-      
columnNames.add(schema.getColumn(i).getAsSchemaPath().getAsUnescapedPath());
+    for (int i=0; i < schema.getFieldCount(); i++) {
+      columnNames.add(schema.getColumn(i).getLastName());
     }
 
     startNewSchema(columnNames);
+  }
 
-    for (int i=0; i<columnVectors.length; i++) {
-      columnVectors[i] = TypeHelper.getNewVector(schema.getColumn(i), 
allocator);
-      AllocationHelper.allocate(columnVectors[i], 1, 
TypeHelper.getSize(schema.getColumn(i).getType()));
-    }
+  @Override
+  public FieldConverter getNewMapConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    throw new UnsupportedOperationException();
+  }
+  public FieldConverter getNewRepeatedMapConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    throw new UnsupportedOperationException();
+  }
+  public FieldConverter getNewRepeatedListConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    throw new UnsupportedOperationException();
   }
 
 <#list vv.types as type>
   <#list type.minor as minor>
     <#list vv.modes as mode>
   @Override
-  public void add${mode.prefix}${minor.class}Holder(int fieldId, 
${mode.prefix}${minor.class}Holder valueHolder) throws IOException {
+  public FieldConverter getNew${mode.prefix}${minor.class}Converter(int 
fieldId, String fieldName, FieldReader reader) {
+    return new ${mode.prefix}${minor.class}StringFieldConverter(fieldId, 
fieldName, reader);
+  }
+
+  public class ${mode.prefix}${minor.class}StringFieldConverter extends 
FieldConverter {
+    <#if mode.prefix == "Repeated">
+    private Repeated${minor.class}Holder holder = new 
Repeated${minor.class}Holder();
+    <#else>
+    private Nullable${minor.class}Holder holder = new 
Nullable${minor.class}Holder();
+    </#if>
+
+    public ${mode.prefix}${minor.class}StringFieldConverter(int fieldId, 
String fieldName, FieldReader reader) {
+      super(fieldId, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() throws IOException {
   <#if mode.prefix == "Nullable" >
-    if (valueHolder.isSet == 0) {
+    if (!reader.isSet()) {
       addField(fieldId, null);
       return;
     }
   <#elseif mode.prefix == "Repeated" >
     throw new UnsupportedOperationException("Repeated types are not 
supported.");
+    }
   }
     <#break>
   </#if>
 
+    reader.read(holder);
   <#if  minor.class == "TinyInt" ||
         minor.class == "UInt1" ||
         minor.class == "UInt2" ||
@@ -94,9 +119,9 @@ public abstract class StringOutputRecordWriter implements 
RecordWriter {
         minor.class == "BigInt" ||
         minor.class == "UInt8" ||
         minor.class == "Float8">
-    addField(fieldId, String.valueOf(valueHolder.value));
+    addField(fieldId, String.valueOf(holder.value));
   <#elseif minor.class == "Bit">
-    addField(fieldId, valueHolder.value == 0 ? "false" : "true");
+    addField(fieldId, holder.value == 0 ? "false" : "true");
   <#elseif
         minor.class == "Date" ||
         minor.class == "Time" ||
@@ -114,33 +139,21 @@ public abstract class StringOutputRecordWriter implements 
RecordWriter {
         minor.class == "Decimal38Sparse">
 
     // TODO: error check
-    
((${mode.prefix}${minor.class}Vector)columnVectors[fieldId]).getMutator().setSafe(0,
 valueHolder);
-    Object obj = 
((${mode.prefix}${minor.class}Vector)columnVectors[fieldId]).getAccessor().getObject(0);
-    addField(fieldId, obj.toString());
+    addField(fieldId, reader.readObject().toString());
 
   <#elseif minor.class == "VarChar" || minor.class == "Var16Char" || 
minor.class == "VarBinary">
-    addField(fieldId, valueHolder.toString());
+    addField(fieldId, reader.readObject().toString());
   <#else>
     throw new UnsupportedOperationException(String.format("Unsupported field 
type: %s"),
-      valueHolder.getCanonicalClass());
+      holder.getCanonicalClass());
    </#if>
+    }
   }
     </#list>
   </#list>
 </#list>
 
   public void cleanup() throws IOException {
-    cleanupColumnVectors();
-  }
-
-  private void cleanupColumnVectors() {
-    if (columnVectors != null){
-      for(ValueVector vector : columnVectors){
-        if(vector!=null){
-          vector.clear();
-        }
-      }
-    }
   }
 
   public abstract void startNewSchema(List<String> columnNames) throws 
IOException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 29b346d..ef4db2a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -108,7 +108,7 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
           // fall through.
         case OK:
           try {
-            counter += eventBasedRecordWriter.write();
+            counter += eventBasedRecordWriter.write(incoming.getRecordCount());
             logger.debug("Total records written so far: {}", counter);
           } catch(IOException ex) {
             throw new RuntimeException(ex);
@@ -162,8 +162,7 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
       stats.stopSetup();
     }
 
-    eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(),
-        new RecordValueAccessor(incoming), recordWriter);
+    eventBasedRecordWriter = new EventBasedRecordWriter(incoming, 
recordWriter);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index a2d22cf..540977d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -50,6 +50,10 @@ public class MaterializedField {
         .setNamePart(key.path.getAsNamePart());
   }
 
+  public List<MaterializedField> getChildren() {
+    return children;
+  }
+
   public void addChild(MaterializedField field){
     children.add(field);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 7fbb9c7..8b5577c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -19,20 +19,24 @@ package org.apache.drill.exec.store.easy.json;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Lists;
 
+import com.google.common.collect.Maps;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -40,6 +44,8 @@ import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.text.DrillTextRecordWriter;
+import org.apache.hadoop.fs.FileSystem;
 
 public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
 
@@ -59,7 +65,23 @@ public class JSONFormatPlugin extends 
EasyFormatPlugin<JSONFormatConfig> {
 
   @Override
   public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) throws IOException {
-    throw new UnsupportedOperationException("Json Writer is not supported 
currently.");
+    Map<String, String> options = Maps.newHashMap();
+
+    options.put("location", writer.getLocation());
+
+    FragmentHandle handle = context.getHandle();
+    String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), 
handle.getMinorFragmentId());
+    options.put("prefix", fragmentId);
+
+    options.put("separator", " ");
+    options.put(FileSystem.FS_DEFAULT_NAME_KEY, 
((FileSystemConfig)writer.getStorageConfig()).connection);
+
+    options.put("extension", "json");
+
+    RecordWriter recordWriter = new JsonRecordWriter();
+    recordWriter.init(options);
+
+    return recordWriter;
   }
 
   @JsonTypeName("json")

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
new file mode 100644
index 0000000..da9f48b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.JSONOutputRecordWriter;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class JsonRecordWriter extends JSONOutputRecordWriter implements 
RecordWriter {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
+
+  private String location;
+  private String prefix;
+
+  private String fieldDelimiter;
+  private String extension;
+
+  private int index;
+  private FileSystem fs = null;
+  private FSDataOutputStream stream = null;
+
+  private final JsonFactory factory = new JsonFactory();
+
+  // Record write status
+  private boolean fRecordStarted = false; // true once the startRecord() is 
called until endRecord() is called
+
+  public JsonRecordWriter(){
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) throws IOException {
+    this.location = writerOptions.get("location");
+    this.prefix = writerOptions.get("prefix");
+    this.fieldDelimiter = writerOptions.get("separator");
+    this.extension = writerOptions.get("extension");
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+    this.fs = FileSystem.get(conf);
+
+    Path fileName = new Path(location, prefix + "_" + index + "." + extension);
+    try {
+      stream = fs.create(fileName);
+      gen = factory.createGenerator(stream).useDefaultPrettyPrinter();
+      logger.debug("Created file: {}", fileName);
+    } catch (IOException ex) {
+      logger.error("Unable to create file: " + fileName, ex);
+      throw ex;
+    }
+  }
+
+  @Override
+  public void updateSchema(BatchSchema schema) throws IOException {
+    // no op
+  }
+
+  @Override
+  public FieldConverter getNewMapConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new MapJsonConverter(fieldId, fieldName, reader);
+  }
+
+  public class MapJsonConverter extends FieldConverter {
+    List<FieldConverter> converters = Lists.newArrayList();
+
+    public MapJsonConverter(int fieldId, String fieldName, FieldReader reader) 
{
+      super(fieldId, fieldName, reader);
+      int i = 0;
+      for (String name : reader) {
+        FieldConverter converter = 
EventBasedRecordWriter.getConverter(JsonRecordWriter.this, i++, name, 
reader.reader(name));
+        converters.add(converter);
+      }
+    }
+
+    @Override
+    public void startField() throws IOException {
+      gen.writeFieldName(fieldName);
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      gen.writeStartObject();
+      for (FieldConverter converter : converters) {
+        converter.startField();
+        converter.writeField();
+      }
+      gen.writeEndObject();
+    }
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedMapConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new RepeatedMapJsonConverter(fieldId, fieldName, reader);
+  }
+
+  public class RepeatedMapJsonConverter extends FieldConverter {
+    List<FieldConverter> converters = Lists.newArrayList();
+
+    public RepeatedMapJsonConverter(int fieldId, String fieldName, FieldReader 
reader) {
+      super(fieldId, fieldName, reader);
+      int i = 0;
+      for (String name : reader) {
+        FieldConverter converter = 
EventBasedRecordWriter.getConverter(JsonRecordWriter.this, i++, name, 
reader.reader(name));
+        converters.add(converter);
+      }
+    }
+
+    @Override
+    public void startField() throws IOException {
+      gen.writeFieldName(fieldName);
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      gen.writeStartArray();
+      while (reader.next()) {
+        gen.writeStartObject();
+        for (FieldConverter converter : converters) {
+          converter.startField();
+          converter.writeField();
+        }
+        gen.writeEndObject();
+      }
+      gen.writeEndArray();
+    }
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedListConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new RepeatedListJsonConverter(fieldId, fieldName, reader);
+  }
+
+  public class RepeatedListJsonConverter extends FieldConverter {
+    FieldConverter converter;
+
+    public RepeatedListJsonConverter(int fieldId, String fieldName, 
FieldReader reader) {
+      super(fieldId, fieldName, reader);
+      converter = EventBasedRecordWriter.getConverter(JsonRecordWriter.this, 
fieldId, fieldName, reader.reader());
+    }
+
+    @Override
+    public void startField() throws IOException {
+      gen.writeFieldName(fieldName);
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      gen.writeStartArray();
+      while (reader.next()) {
+        converter.writeField();
+      }
+      gen.writeEndArray();
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    gen.writeStartObject();
+    fRecordStarted = true;
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    gen.writeEndObject();
+    fRecordStarted = false;
+  }
+
+  @Override
+  public void abort() throws IOException {
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    gen.flush();
+    stream.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index a336316..94ccc13 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -18,11 +18,18 @@
 package org.apache.drill.exec.store.parquet;
 
 import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.EventBasedRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,8 +43,10 @@ import parquet.io.ColumnIOFactory;
 import parquet.io.MessageColumnIO;
 import parquet.io.api.RecordConsumer;
 import parquet.schema.DecimalMetadata;
+import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type;
 import parquet.schema.Type.Repetition;
@@ -107,15 +116,7 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
   private void newSchema() throws IOException {
     List<Type> types = Lists.newArrayList();
     for (MaterializedField field : batchSchema) {
-      String name = field.getAsSchemaPath().getAsUnescapedPath();
-      MinorType minorType = field.getType().getMinorType();
-      PrimitiveTypeName primitiveTypeName = 
ParquetTypeHelper.getPrimitiveTypeNameForMinorType(minorType);
-      Repetition repetition = 
ParquetTypeHelper.getRepetitionForDataMode(field.getDataMode());
-      OriginalType originalType = 
ParquetTypeHelper.getOriginalTypeForMinorType(minorType);
-      DecimalMetadata decimalMetadata = 
ParquetTypeHelper.getDecimalMetadataForField(field);
-      int length = ParquetTypeHelper.getLengthForMinorType(minorType);
-      parquet.schema.Type type = new parquet.schema.PrimitiveType(repetition, 
primitiveTypeName, length, name, originalType, decimalMetadata);
-      types.add(type);
+      types.add(getType(field));
     }
     schema = new MessageType("root", types);
 
@@ -132,6 +133,34 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
     setUp(schema, consumer);
   }
 
+  private PrimitiveType getPrimitiveType(MaterializedField field) {
+    MinorType minorType = field.getType().getMinorType();
+    String name = field.getLastName();
+    PrimitiveTypeName primitiveTypeName = 
ParquetTypeHelper.getPrimitiveTypeNameForMinorType(minorType);
+    Repetition repetition = 
ParquetTypeHelper.getRepetitionForDataMode(field.getDataMode());
+    OriginalType originalType = 
ParquetTypeHelper.getOriginalTypeForMinorType(minorType);
+    DecimalMetadata decimalMetadata = 
ParquetTypeHelper.getDecimalMetadataForField(field);
+    int length = ParquetTypeHelper.getLengthForMinorType(minorType);
+    return new PrimitiveType(repetition, primitiveTypeName, length, name, 
originalType, decimalMetadata);
+  }
+
+  private parquet.schema.Type getType(MaterializedField field) {
+    MinorType minorType = field.getType().getMinorType();
+    DataMode dataMode = field.getType().getMode();
+    switch(minorType) {
+      case MAP:
+        List<parquet.schema.Type> types = Lists.newArrayList();
+        for (MaterializedField childField : field.getChildren()) {
+          types.add(getType(childField));
+        }
+        return new GroupType(dataMode == DataMode.REPEATED ? 
Repetition.REPEATED : Repetition.OPTIONAL, field.getLastName(), types);
+      case LIST:
+        throw new UnsupportedOperationException("Unsupported type " + 
minorType);
+      default:
+        return getPrimitiveType(field);
+    }
+  }
+
   private void flush() throws IOException {
     w.startBlock(recordCount);
     store.flush();
@@ -163,6 +192,70 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
   }
 
   @Override
+  public FieldConverter getNewMapConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new MapParquetConverter(fieldId, fieldName, reader);
+  }
+
+  public class MapParquetConverter extends FieldConverter {
+    List<FieldConverter> converters = Lists.newArrayList();
+
+    public MapParquetConverter(int fieldId, String fieldName, FieldReader 
reader) {
+      super(fieldId, fieldName, reader);
+      int i = 0;
+      for (String name : reader) {
+        FieldConverter converter = 
EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, i++, name, 
reader.reader(name));
+        converters.add(converter);
+      }
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      consumer.startField(fieldName, fieldId);
+      consumer.startGroup();
+      for (FieldConverter converter : converters) {
+        converter.writeField();
+      }
+      consumer.endGroup();
+      consumer.endField(fieldName, fieldId);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedMapConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new RepeatedMapParquetConverter(fieldId, fieldName, reader);
+  }
+
+  public class RepeatedMapParquetConverter extends FieldConverter {
+    List<FieldConverter> converters = Lists.newArrayList();
+
+    public RepeatedMapParquetConverter(int fieldId, String fieldName, 
FieldReader reader) {
+      super(fieldId, fieldName, reader);
+      int i = 0;
+      for (String name : reader) {
+        FieldConverter converter = 
EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, i++, name, 
reader.reader(name));
+        converters.add(converter);
+      }
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      if (reader.size() == 0) {
+        return;
+      }
+      consumer.startField(fieldName, fieldId);
+      while (reader.next()) {
+        consumer.startGroup();
+        for (FieldConverter converter : converters) {
+          converter.writeField();
+        }
+        consumer.endGroup();
+      }
+      consumer.endField(fieldName, fieldId);
+    }
+  }
+
+
+  @Override
   public void startRecord() throws IOException {
     consumer.startMessage();
   }
@@ -176,7 +269,6 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
 
   @Override
   public void abort() throws IOException {
-    //To change body of implemented methods use File | Settings | File 
Templates.
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 55f2b72..23d95b8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -23,8 +23,11 @@ import java.io.PrintStream;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.StringOutputRecordWriter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -117,6 +120,33 @@ public class DrillTextRecordWriter extends 
StringOutputRecordWriter {
   }
 
   @Override
+  public FieldConverter getNewMapConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ComplexStringFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedMapConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ComplexStringFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedListConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ComplexStringFieldConverter(fieldId, fieldName, reader);
+  }
+
+  public class ComplexStringFieldConverter extends FieldConverter {
+
+    public ComplexStringFieldConverter(int fieldId, String fieldName, 
FieldReader reader) {
+      super(fieldId, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      addField(fieldId, reader.readObject().toString());
+    }
+  }
+
+  @Override
   public void cleanup() throws IOException {
     super.cleanup();
     if (stream != null) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index cb77032..c67c047 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -37,6 +37,7 @@ import 
org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.drill.exec.vector.UInt4Vector;
@@ -79,6 +80,9 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
     clear();
     offsets.allocateNew(parentValueCount+1);
     offsets.zeroVector();
+    for(ValueVector v : vectors.values()){
+      AllocationHelper.allocate(v, parentValueCount, 50, childValueCount);
+    }
     mutator.reset();
     accessor.reset();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
index 5350662..b89d26d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
@@ -76,7 +76,10 @@ public class RepeatedMapReaderImpl extends 
AbstractFieldReader{
   private int maxOffset;
 
   public int size(){
-    return maxOffset - currentOffset;
+    if (isNull()) {
+      return 0;
+    }
+    return maxOffset - (currentOffset < 0 ? 0 : currentOffset);
   }
 
   public void setPosition(int index){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 1cb0d06..89beeb0 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -63,6 +63,13 @@ public class TestParquetWriter extends BaseTestQuery {
   }
 
   @Test
+  public void testComplex() throws Exception {
+    String selection = "*";
+    String inputTable = "cp.`donuts.json`";
+    runTestAndValidate(selection, selection, inputTable, "donuts_json");
+  }
+
+  @Test
   public void testCastProjectBug_Drill_929() throws Exception {
     String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, 
L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
         "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as 
COMMITDATE, cast(L_RECEIPTDATE as DATE) AS RECEIPTDATE, L_SHIPINSTRUCT, 
L_SHIPMODE, L_COMMENT";
@@ -175,7 +182,6 @@ public class TestParquetWriter extends BaseTestQuery {
     runTestAndValidate("*", "*", inputTable, "nullable_test");
   }
 
-  @Ignore // fails intermittenly when being run with other tests, a patch in 
DRILL
   @Test
   public void testDecimal() throws Exception {
     String selection = "cast(salary as decimal(8,2)) as decimal8, cast(salary 
as decimal(15,2)) as decimal15, " +
@@ -285,7 +291,6 @@ public class TestParquetWriter extends BaseTestQuery {
           continue;
         if ( (actualRecords.get(i).get(column) == null && record.get(column) 
== null) || ! actualRecords.get(i).get(column).equals(record.get(column))) {
           missmatch++;
-          System.out.println( counter + " " + column + "[ex: " + 
record.get(column) + ", actual:" + actualRecords.get(i).get(column) + "]");
         }
       }
       if ( ! actualRecords.remove(record)) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/exec/java-exec/src/test/resources/donuts.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/donuts.json 
b/exec/java-exec/src/test/resources/donuts.json
new file mode 100644
index 0000000..276bcbe
--- /dev/null
+++ b/exec/java-exec/src/test/resources/donuts.json
@@ -0,0 +1,132 @@
+{
+               "id": "0001",
+               "type": "donut",
+               "name": "Cake",
+               "ppu": 0.55,
+               "sales": 35,
+
+
+               "batters":
+                       {
+                               "batter":
+                                       [
+                                               { "id": "1001", "type": 
"Regular" },
+                                               { "id": "1002", "type": 
"Chocolate" },
+                                               { "id": "1003", "type": 
"Blueberry" },
+                                               { "id": "1004", "type": 
"Devil's Food" }
+                                       ]
+                       },
+               "topping":
+                       [
+                               { "id": "5001", "type": "None" },
+                               { "id": "5002", "type": "Glazed" },
+                               { "id": "5005", "type": "Sugar" },
+                               { "id": "5007", "type": "Powdered Sugar" },
+                               { "id": "5006", "type": "Chocolate with 
Sprinkles" },
+                               { "id": "5003", "type": "Chocolate" },
+                               { "id": "5004", "type": "Maple" }
+                       ]
+       }
+       {
+               "id": "0002",
+               "type": "donut",
+               "name": "Raised",
+               "ppu": 0.69,
+               "sales": 145,
+               "batters":
+                       {
+                               "batter":
+                                       [
+                                               { "id": "1001", "type": 
"Regular" }
+                                       ]
+                       },
+               "topping":
+                       [
+                               { "id": "5001", "type": "None" },
+                               { "id": "5002", "type": "Glazed" },
+                               { "id": "5005", "type": "Sugar" },
+                               { "id": "5003", "type": "Chocolate" },
+                               { "id": "5004", "type": "Maple" }
+                       ]
+       }
+       {
+               "id": "0003",
+               "type": "donut",
+               "name": "Old Fashioned",
+               "ppu": 0.55,
+               "sales": 300,
+
+
+               "batters":
+                       {
+                               "batter":
+                                       [
+                                               { "id": "1001", "type": 
"Regular" },
+                                               { "id": "1002", "type": 
"Chocolate" }
+                                       ]
+                       },
+               "topping":
+                       [
+                               { "id": "5001", "type": "None" },
+                               { "id": "5002", "type": "Glazed" },
+                               { "id": "5003", "type": "Chocolate" },
+                               { "id": "5004", "type": "Maple" }
+                       ]
+       }
+               {
+               "id": "0004",
+               "type": "donut",
+               "name": "Filled",
+               "ppu": 0.69,
+               "sales": 14,
+
+
+               "batters":
+                       {
+                               "batter":
+                                       [
+                                               { "id": "1001", "type": 
"Regular" },
+                                               { "id": "1002", "type": 
"Chocolate" },
+                                               { "id": "1003", "type": 
"Blueberry" },
+                                               { "id": "1004", "type": 
"Devil's Food" }
+                                       ]
+                       },
+               "topping":
+                       [
+                               { "id": "5001", "type": "None" },
+                               { "id": "5002", "type": "Glazed" },
+                               { "id": "5005", "type": "Sugar" },
+                               { "id": "5007", "type": "Powdered Sugar" },
+                               { "id": "5006", "type": "Chocolate with 
Sprinkles" },
+                               { "id": "5003", "type": "Chocolate" },
+                               { "id": "5004", "type": "Maple" }
+                       ],
+               "filling":
+                       [
+                               { "id": "6001", "type": "None" },
+                               { "id": "6002", "type": "Raspberry" },
+                               { "id": "6003", "type": "Lemon" },
+                               { "id": "6004", "type": "Chocolate" },
+                               { "id": "6005", "type": "Kreme" }
+                       ]
+       }
+               {
+               "id": "0005",
+               "type": "donut",
+               "name": "Apple Fritter",
+               "ppu": 1.00,
+               "sales": 700,
+
+
+               "batters":
+                       {
+                               "batter":
+                                       [
+                                               { "id": "1001", "type": 
"Regular" }
+                                       ]
+                       },
+               "topping":
+                       [
+                               { "id": "5002", "type": "Glazed" }
+                       ]
+       }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d238ee/protocol/src/main/java/org/apache/drill/common/types/MinorType.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/common/types/MinorType.java 
b/protocol/src/main/java/org/apache/drill/common/types/MinorType.java
index 772e2ff..423ed53 100644
--- a/protocol/src/main/java/org/apache/drill/common/types/MinorType.java
+++ b/protocol/src/main/java/org/apache/drill/common/types/MinorType.java
@@ -57,7 +57,8 @@ public enum MinorType implements 
com.dyuproject.protostuff.EnumLite<MinorType>
     NULL(37),
     INTERVALYEAR(38),
     INTERVALDAY(39),
-    LIST(40);
+    LIST(40),
+    GENERIC_OBJECT(41);
     
     public final int number;
     
@@ -111,6 +112,7 @@ public enum MinorType implements 
com.dyuproject.protostuff.EnumLite<MinorType>
             case 38: return INTERVALYEAR;
             case 39: return INTERVALDAY;
             case 40: return LIST;
+            case 41: return GENERIC_OBJECT;
             default: return null;
         }
     }

Reply via email to