http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
index 15807cb..61c201b 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
@@ -19,11 +19,11 @@ package org.apache.drill.exec.vector.accessor.writer;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
-import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 
 /**
@@ -36,14 +36,8 @@ import 
org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 
 public abstract class AbstractObjectWriter implements ObjectWriter {
 
-  private ColumnMetadata schema;
-
-  public AbstractObjectWriter(ColumnMetadata schema) {
-    this.schema = schema;
-  }
-
   @Override
-  public ColumnMetadata schema() { return schema; }
+  public ColumnMetadata schema() { return baseWriter().schema(); }
 
   @Override
   public ScalarWriter scalar() {
@@ -62,11 +56,40 @@ public abstract class AbstractObjectWriter implements 
ObjectWriter {
 
   public abstract WriterEvents events();
 
+  public ColumnWriter baseWriter() {
+    return (ColumnWriter) events();
+  }
+
   @Override
-  public void bindListener(ColumnWriterListener listener) { }
+  public ObjectType type() { return baseWriter().type(); }
+
+  @Override
+  public boolean nullable() { return baseWriter().nullable(); }
+
+  @Override
+  public void setNull() {
+    baseWriter().setNull();
+  }
 
   @Override
-  public void bindListener(TupleWriterListener listener) { }
+  public void setObject(Object value) {
+    baseWriter().setObject(value);
+  }
 
   public abstract void dump(HierarchicalFormatter format);
+
+  @Override
+  public int rowStartIndex() {
+    return baseWriter().rowStartIndex();
+  }
+
+  @Override
+  public int lastWriteIndex() {
+    return baseWriter().lastWriteIndex();
+  }
+
+  @Override
+  public int writeIndex() {
+    return baseWriter().writeIndex();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
index b6a85d7..08e4ac3 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
@@ -21,8 +21,10 @@ import java.math.BigDecimal;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 import org.joda.time.Period;
 
@@ -39,29 +41,17 @@ public abstract class AbstractScalarWriter implements 
ScalarWriter, WriterEvents
 
     private AbstractScalarWriter scalarWriter;
 
-    public ScalarObjectWriter(ColumnMetadata schema, AbstractScalarWriter 
scalarWriter) {
-      super(schema);
+    public ScalarObjectWriter(AbstractScalarWriter scalarWriter) {
       this.scalarWriter = scalarWriter;
     }
 
     @Override
-    public ObjectType type() { return ObjectType.SCALAR; }
-
-    @Override
-    public void set(Object value) { scalarWriter.setObject(value); }
-
-    @Override
     public ScalarWriter scalar() { return scalarWriter; }
 
     @Override
     public WriterEvents events() { return scalarWriter; }
 
     @Override
-    public void bindListener(ColumnWriterListener listener) {
-      scalarWriter.bindListener(listener);
-    }
-
-    @Override
     public void dump(HierarchicalFormatter format) {
       format
         .startObject(this)
@@ -71,6 +61,42 @@ public abstract class AbstractScalarWriter implements 
ScalarWriter, WriterEvents
     }
   }
 
+  protected ColumnMetadata schema;
+
+  /**
+   * Indicates the position in the vector to write. Set via an object so that
+   * all writers (within the same subtree) can agree on the write position.
+   * For example, all top-level, simple columns see the same row index.
+   * All columns within a repeated map see the same (inner) index, etc.
+   */
+
+  protected ColumnWriterIndex vectorIndex;
+
+  @Override
+  public ObjectType type() { return ObjectType.SCALAR; }
+
+  public void bindSchema(ColumnMetadata schema) {
+    this.schema = schema;
+  }
+
+  @Override
+  public void bindIndex(ColumnWriterIndex vectorIndex) {
+    this.vectorIndex = vectorIndex;
+  }
+
+  @Override
+  public int rowStartIndex() {
+    return vectorIndex.rowStartIndex();
+  }
+
+  @Override
+  public int writeIndex() {
+    return vectorIndex.vectorIndex();
+  }
+
+  @Override
+  public ColumnMetadata schema() { return schema; }
+
   public abstract BaseDataValueVector vector();
 
   @Override
@@ -85,6 +111,10 @@ public abstract class AbstractScalarWriter implements 
ScalarWriter, WriterEvents
   @Override
   public void saveRow() { }
 
+  protected UnsupportedConversionError conversionError(String javaType) {
+    return UnsupportedConversionError.writeError(schema(), javaType);
+  }
+
   @Override
   public void setObject(Object value) {
     if (value == null) {
@@ -111,8 +141,7 @@ public abstract class AbstractScalarWriter implements 
ScalarWriter, WriterEvents
     } else if (value instanceof Float) {
       setDouble((Float) value);
     } else {
-      throw new IllegalArgumentException("Unsupported type " +
-                value.getClass().getSimpleName());
+      throw conversionError(value.getClass().getSimpleName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 938f867..640f5d3 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
@@ -104,29 +105,17 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
 
     private AbstractTupleWriter tupleWriter;
 
-    public TupleObjectWriter(ColumnMetadata schema, AbstractTupleWriter 
tupleWriter) {
-      super(schema);
+    public TupleObjectWriter(AbstractTupleWriter tupleWriter) {
       this.tupleWriter = tupleWriter;
     }
 
     @Override
-    public ObjectType type() { return ObjectType.TUPLE; }
-
-    @Override
-    public void set(Object value) { tupleWriter.setObject(value); }
-
-    @Override
     public TupleWriter tuple() { return tupleWriter; }
 
     @Override
     public WriterEvents events() { return tupleWriter; }
 
     @Override
-    public void bindListener(TupleWriterListener listener) {
-      tupleWriter.bindListener(listener);
-    }
-
-    @Override
     public void dump(HierarchicalFormatter format) {
       format
         .startObject(this)
@@ -136,38 +125,7 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
     }
   }
 
-  /**
-   * Tracks the write state of the tuple to allow applying the correct
-   * operations to newly-added columns to synchronize them with the rest
-   * of the tuple.
-   */
-
-  public enum State {
-    /**
-     * No write is in progress. Nothing need be done to newly-added
-     * writers.
-     */
-    IDLE,
-
-    /**
-     * <tt>startWrite()</tt> has been called to start a write operation
-     * (start a batch), but <tt>startValue()</tt> has not yet been called
-     * to start a row (or value within an array). <tt>startWrite()</tt> must
-     * be called on newly added columns.
-     */
-
-    IN_WRITE,
-
-    /**
-     * Both <tt>startWrite()</tt> and <tt>startValue()</tt> has been called on
-     * the tuple to prepare for writing values, and both must be called on
-     * newly-added vectors.
-     */
-
-    IN_ROW
-  }
-
-  protected final TupleMetadata schema;
+  protected final TupleMetadata tupleSchema;
   protected final List<AbstractObjectWriter> writers;
   protected ColumnWriterIndex vectorIndex;
   protected ColumnWriterIndex childIndex;
@@ -175,7 +133,7 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
   protected State state = State.IDLE;
 
   protected AbstractTupleWriter(TupleMetadata schema, 
List<AbstractObjectWriter> writers) {
-    this.schema = schema;
+    this.tupleSchema = schema;
     this.writers = writers;
   }
 
@@ -183,6 +141,9 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
     this(schema, new ArrayList<AbstractObjectWriter>());
   }
 
+  @Override
+  public ObjectType type() { return ObjectType.TUPLE; }
+
   protected void bindIndex(ColumnWriterIndex index, ColumnWriterIndex 
childIndex) {
     vectorIndex = index;
     this.childIndex = childIndex;
@@ -198,7 +159,9 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
   }
 
   @Override
-  public ColumnWriterIndex writerIndex() { return vectorIndex; }
+  public int rowStartIndex() {
+    return vectorIndex.rowStartIndex();
+  }
 
   /**
    * Add a column writer to an existing tuple writer. Used for implementations
@@ -209,8 +172,8 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
    */
 
   public int addColumnWriter(AbstractObjectWriter colWriter) {
-    assert writers.size() == schema.size();
-    int colIndex = schema.addColumn(colWriter.schema());
+    assert writers.size() == tupleSchema.size();
+    int colIndex = tupleSchema.addColumn(colWriter.schema());
     writers.add(colWriter);
     colWriter.events().bindIndex(childIndex);
     if (state != State.IDLE) {
@@ -223,6 +186,12 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
   }
 
   @Override
+  public ProjectionType projectionType(String columnName) {
+    return listener == null ? ProjectionType.UNSPECIFIED
+        : listener.projectionType(columnName);
+  }
+
+  @Override
   public int addColumn(ColumnMetadata column) {
     if (listener == null) {
       throw new UnsupportedOperationException("addColumn");
@@ -241,10 +210,18 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
   }
 
   @Override
-  public TupleMetadata schema() { return schema; }
+  public TupleMetadata tupleSchema() { return tupleSchema; }
 
   @Override
-  public int size() { return schema().size(); }
+  public int size() { return tupleSchema().size(); }
+
+  @Override
+  public boolean nullable() { return false; }
+
+  @Override
+  public void setNull() {
+    throw new IllegalStateException("Not nullable");
+  }
 
   @Override
   public void startWrite() {
@@ -339,7 +316,7 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
 
   @Override
   public ObjectWriter column(String colName) {
-    int index = schema.index(colName);
+    int index = tupleSchema.index(colName);
     if (index == -1) {
       throw new UndefinedColumnException(colName);
     }
@@ -348,35 +325,18 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
 
   @Override
   public void set(int colIndex, Object value) {
-    ObjectWriter colWriter = column(colIndex);
-    switch (colWriter.type()) {
-    case ARRAY:
-      colWriter.array().setObject(value);
-      break;
-    case SCALAR:
-      colWriter.scalar().setObject(value);
-      break;
-    case TUPLE:
-      colWriter.tuple().setObject(value);
-      break;
-    default:
-      throw new IllegalStateException("Unexpected object type: " + 
colWriter.type());
-    }
-  }
-
-  @Override
-  public void setTuple(Object ...values) {
-    setObject(values);
+    column(colIndex).setObject(value);
   }
 
   @Override
   public void setObject(Object value) {
     Object values[] = (Object[]) value;
-    if (values.length != schema.size()) {
+    if (values.length != tupleSchema.size()) {
       throw new IllegalArgumentException(
-          "Map has " + schema.size() +
-          " columns, but value array has " +
-          values.length + " values.");
+          String.format("Map %s has %d columns, but value array has " +
+              " %d values.",
+              schema().name(),
+              tupleSchema.size(), values.length));
     }
     for (int i = 0; i < values.length; i++) {
       set(i, values[i]);
@@ -429,6 +389,11 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
   }
 
   @Override
+  public int writeIndex() {
+    return vectorIndex.vectorIndex();
+  }
+
+  @Override
   public void bindListener(TupleWriterListener listener) {
     this.listener = listener;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
index 4793277..10a2cf3 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.vector.accessor.writer;
 
 import java.math.BigDecimal;
 
-import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 import org.joda.time.Period;
 
@@ -134,15 +134,6 @@ public abstract class BaseScalarWriter extends 
AbstractScalarWriter {
   public static final int MIN_BUFFER_SIZE = 256;
 
   /**
-   * Indicates the position in the vector to write. Set via an object so that
-   * all writers (within the same subtree) can agree on the write position.
-   * For example, all top-level, simple columns see the same row index.
-   * All columns within a repeated map see the same (inner) index, etc.
-   */
-
-  protected ColumnWriterIndex vectorIndex;
-
-  /**
    * Listener invoked if the vector overflows. If not provided, then the writer
    * does not support vector overflow.
    */
@@ -160,14 +151,6 @@ public abstract class BaseScalarWriter extends 
AbstractScalarWriter {
   protected int capacity;
 
   @Override
-  public void bindIndex(ColumnWriterIndex vectorIndex) {
-    this.vectorIndex = vectorIndex;
-  }
-
-  @Override
-  public ColumnWriterIndex writerIndex() { return vectorIndex; }
-
-  @Override
   public void bindListener(ColumnWriterListener listener) {
     this.listener = listener;
   }
@@ -176,7 +159,7 @@ public abstract class BaseScalarWriter extends 
AbstractScalarWriter {
    * All change of buffer comes through this function to allow capturing
    * the buffer address and capacity. Only two ways to set the buffer:
    * by binding to a vector in bindVector(), or by resizing the vector
-   * in writeIndex().
+   * in prepareWrite().
    */
 
   protected abstract void setBuffer();
@@ -220,43 +203,46 @@ public abstract class BaseScalarWriter extends 
AbstractScalarWriter {
   public abstract void skipNulls();
 
   @Override
+  public boolean nullable() { return false; }
+
+  @Override
   public void setNull() {
-    throw new UnsupportedOperationException("Vector is not nullable");
+    throw UnsupportedConversionError.nullError(schema());
   }
 
   @Override
   public void setInt(int value) {
-    throw new UnsupportedOperationException();
+    throw conversionError("int");
   }
 
   @Override
   public void setLong(long value) {
-    throw new UnsupportedOperationException();
+    throw conversionError("long");
   }
 
   @Override
   public void setDouble(double value) {
-    throw new UnsupportedOperationException();
+    throw conversionError("double");
   }
 
   @Override
   public void setString(String value) {
-    throw new UnsupportedOperationException();
+    throw conversionError("String");
   }
 
   @Override
   public void setBytes(byte[] value, int len) {
-    throw new UnsupportedOperationException();
+    throw conversionError("bytes");
   }
 
   @Override
   public void setDecimal(BigDecimal value) {
-    throw new UnsupportedOperationException();
+    throw conversionError("Decimal");
   }
 
   @Override
   public void setPeriod(Period value) {
-    throw new UnsupportedOperationException();
+    throw conversionError("Period");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
index e54625e..9db767d 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
@@ -36,10 +36,10 @@ import 
org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
  */
 
 public abstract class BaseVarWidthWriter extends BaseScalarWriter {
-  protected final OffsetVectorWriter offsetsWriter;
+  protected final OffsetVectorWriterImpl offsetsWriter;
 
   public BaseVarWidthWriter(UInt4Vector offsetVector) {
-    offsetsWriter = new OffsetVectorWriter(offsetVector);
+    offsetsWriter = new OffsetVectorWriterImpl(offsetVector);
   }
 
   @Override
@@ -57,7 +57,7 @@ public abstract class BaseVarWidthWriter extends 
BaseScalarWriter {
   @Override
   public void startRow() { offsetsWriter.startRow(); }
 
-  protected final int writeIndex(final int width) {
+  protected final int prepareWrite(final int width) {
 
     // This is performance critical code; every operation counts.
     // Please be thoughtful when changing the code.

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
index 4a668c4..3f0cae3 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
@@ -19,34 +19,28 @@ package org.apache.drill.exec.vector.accessor.writer;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.NullableVector;
-import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ColumnAccessorUtils;
 import 
org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import 
org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter;
-import 
org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter.TupleObjectWriter;
-import org.apache.drill.exec.vector.accessor.writer.MapWriter.ArrayMapWriter;
-import 
org.apache.drill.exec.vector.accessor.writer.MapWriter.DummyArrayMapWriter;
-import org.apache.drill.exec.vector.accessor.writer.MapWriter.DummyMapWriter;
-import org.apache.drill.exec.vector.accessor.writer.MapWriter.SingleMapWriter;
 import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
 import org.apache.drill.exec.vector.accessor.writer.dummy.DummyScalarWriter;
-import org.apache.drill.exec.vector.complex.AbstractMapVector;
-import org.apache.drill.exec.vector.complex.MapVector;
-import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
 /**
  * Gather generated writer classes into a set of class tables to allow rapid
  * run-time creation of writers. Builds the writer and its object writer
  * wrapper which binds the vector to the writer.
+ * <p>
+ * Compared to the reader factory, the writer factor is a bit more complex
+ * as it must handle both the projected ("real" writer) and unprojected
+ * ("dummy" writer) cases. Because of the way the various classes interact,
+ * it is cleaner to put the factory methods here rather than in the various
+ * writers, as is done in the case of the readers.
  */
 
 @SuppressWarnings("unchecked")
@@ -79,21 +73,38 @@ public class ColumnWriterFactory {
     default:
       switch (schema.mode()) {
       case OPTIONAL:
-        NullableVector nullableVector = (NullableVector) vector;
-        return NullableScalarWriter.build(schema, nullableVector,
-                newWriter(nullableVector.getValuesVector()));
+        return nullableScalarWriter(schema, (NullableVector) vector);
       case REQUIRED:
-        return new ScalarObjectWriter(schema, newWriter(vector));
+        return requiredScalarWriter(schema, vector);
       case REPEATED:
-        RepeatedValueVector repeatedVector = (RepeatedValueVector) vector;
-        return ScalarArrayWriter.build(schema, repeatedVector,
-                newWriter(repeatedVector.getDataVector()));
+        return repeatedScalarWriter(schema, (RepeatedValueVector) vector);
       default:
         throw new UnsupportedOperationException(schema.mode().toString());
       }
     }
   }
 
+  private static ScalarObjectWriter requiredScalarWriter(
+      ColumnMetadata schema, ValueVector vector) {
+    BaseScalarWriter baseWriter = newWriter(vector);
+    baseWriter.bindSchema(schema);
+    return new ScalarObjectWriter(baseWriter);
+  }
+
+  private static ScalarObjectWriter nullableScalarWriter(
+      ColumnMetadata schema, NullableVector vector) {
+    BaseScalarWriter baseWriter = newWriter(vector.getValuesVector());
+    baseWriter.bindSchema(schema);
+    return NullableScalarWriter.build(schema, vector, baseWriter);
+  }
+
+  private static AbstractObjectWriter repeatedScalarWriter(
+      ColumnMetadata schema, RepeatedValueVector vector) {
+    BaseScalarWriter baseWriter = newWriter(vector.getDataVector());
+    baseWriter.bindSchema(schema);
+    return ScalarArrayWriter.build(schema, vector, baseWriter);
+  }
+
   /**
    * Build a writer for a non-projected column.
    * @param schema schema of the column
@@ -104,20 +115,19 @@ public class ColumnWriterFactory {
     switch (schema.type()) {
     case GENERIC_OBJECT:
     case LATE:
-    case NULL:
     case LIST:
     case MAP:
       throw new UnsupportedOperationException(schema.type().toString());
     default:
-      ScalarObjectWriter scalarWriter = new ScalarObjectWriter(schema,
-          new DummyScalarWriter());
+      ScalarObjectWriter scalarWriter = new ScalarObjectWriter(
+          new DummyScalarWriter(schema));
       switch (schema.mode()) {
       case OPTIONAL:
       case REQUIRED:
         return scalarWriter;
       case REPEATED:
-        return new ArrayObjectWriter(schema,
-            new DummyArrayWriter(
+        return new ArrayObjectWriter(
+            new DummyArrayWriter(schema,
               scalarWriter));
       default:
         throw new UnsupportedOperationException(schema.mode().toString());
@@ -125,59 +135,6 @@ public class ColumnWriterFactory {
     }
   }
 
-  public static TupleObjectWriter buildMap(ColumnMetadata schema, MapVector 
vector,
-                                        List<AbstractObjectWriter> writers) {
-    MapWriter mapWriter;
-    if (schema.isProjected()) {
-      mapWriter = new SingleMapWriter(schema, vector, writers);
-    } else {
-      mapWriter = new DummyMapWriter(schema, writers);
-    }
-    return new TupleObjectWriter(schema, mapWriter);
-  }
-
-  public static ArrayObjectWriter buildMapArray(ColumnMetadata schema,
-                                        UInt4Vector offsetVector,
-                                        List<AbstractObjectWriter> writers) {
-    MapWriter mapWriter;
-    if (schema.isProjected()) {
-      mapWriter = new ArrayMapWriter(schema, writers);
-    } else {
-      mapWriter = new DummyArrayMapWriter(schema, writers);
-    }
-    TupleObjectWriter mapArray = new TupleObjectWriter(schema, mapWriter);
-    AbstractArrayWriter arrayWriter;
-    if (schema.isProjected()) {
-      arrayWriter = new ObjectArrayWriter(
-          offsetVector,
-          mapArray);
-    } else  {
-      arrayWriter = new DummyArrayWriter(mapArray);
-    }
-    return new ArrayObjectWriter(schema, arrayWriter);
-  }
-
-  public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema,
-      AbstractMapVector vector,
-      List<AbstractObjectWriter> writers) {
-    assert (vector != null) == schema.isProjected();
-    if (! schema.isArray()) {
-      return buildMap(schema, (MapVector) vector, writers);
-    } else if (vector == null) {
-      return buildMapArray(schema,
-          null, writers);
-    } else {
-      return buildMapArray(schema,
-          ((RepeatedMapVector) vector).getOffsetVector(),
-          writers);
-    }
-  }
-
-  public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, 
AbstractMapVector vector) {
-    assert schema.mapSchema().size() == 0;
-    return buildMapWriter(schema, vector, new 
ArrayList<AbstractObjectWriter>());
-  }
-
   public static BaseScalarWriter newWriter(ValueVector vector) {
     MajorType major = vector.getField().getType();
     MinorType type = major.getMinorType();

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index af8daba..915c994 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -17,11 +17,18 @@
  */
 package org.apache.drill.exec.vector.accessor.writer;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import 
org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
 import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 
 /**
  * Writer for a Drill Map type. Maps are actually tuples, just like rows.
@@ -46,6 +53,7 @@ public abstract class MapWriter extends AbstractTupleWriter {
     @Override public int vectorIndex() { return baseIndex.vectorIndex(); }
     @Override public void nextElement() { }
     @Override public void rollover() { }
+
     @Override public ColumnWriterIndex outerIndex() {
       return baseIndex.outerIndex();
     }
@@ -80,6 +88,8 @@ public abstract class MapWriter extends AbstractTupleWriter {
     public void endWrite() {
       super.endWrite();
 
+      // A non repeated map has a field that holds the value count.
+      // Update it. (A repeated map uses the offset vector's value count.)
       // Special form of set value count: used only for
       // this class to avoid setting the value count of children.
       // Setting these counts was already done. Doing it again
@@ -87,13 +97,14 @@ public abstract class MapWriter extends AbstractTupleWriter 
{
       // set the "lastSet" field of nullable vector accessors,
       // and the initial value of -1 will cause all values to
       // be overwritten.
-      //
-      // Note that the map vector can be null if there is no actual
-      // map vector represented by this writer.
 
-      if (mapVector != null) {
-        mapVector.setMapValueCount(vectorIndex.vectorIndex());
-      }
+      mapVector.setMapValueCount(vectorIndex.vectorIndex());
+    }
+
+    @Override
+    public void preRollover() {
+      super.preRollover();
+      mapVector.setMapValueCount(vectorIndex.rowStartIndex());
     }
   }
 
@@ -122,12 +133,6 @@ public abstract class MapWriter extends 
AbstractTupleWriter {
 
       bindIndex(index, new MemberWriterIndex(index));
     }
-
-    // In endWrite(), do not call setValueCount on the map vector.
-    // Doing so will zero-fill the composite vectors because
-    // the internal map state does not track the writer state.
-    // Instead, the code in this structure has set the value
-    // count for each composite vector individually.
   }
 
   protected static class DummyMapWriter extends MapWriter {
@@ -136,6 +141,9 @@ public abstract class MapWriter extends AbstractTupleWriter 
{
         List<AbstractObjectWriter> writers) {
       super(schema, writers);
     }
+
+    @Override
+    public ProjectionType projectionType(String columnName) { return 
ProjectionType.UNPROJECTED; }
   }
 
   protected static class DummyArrayMapWriter extends MapWriter {
@@ -144,6 +152,9 @@ public abstract class MapWriter extends AbstractTupleWriter 
{
         List<AbstractObjectWriter> writers) {
       super(schema, writers);
     }
+
+    @Override
+    public ProjectionType projectionType(String columnName) { return 
ProjectionType.UNPROJECTED; }
   }
 
   protected final ColumnMetadata mapColumnSchema;
@@ -152,4 +163,61 @@ public abstract class MapWriter extends 
AbstractTupleWriter {
     super(schema.mapSchema(), writers);
     mapColumnSchema = schema;
   }
+
+  public static TupleObjectWriter buildMap(ColumnMetadata schema, MapVector 
vector,
+      List<AbstractObjectWriter> writers) {
+    MapWriter mapWriter;
+    if (schema.isProjected()) {
+
+      // Vector is not required for a map writer; the map's columns
+      // are written, but not the (non-array) map.
+
+      mapWriter = new SingleMapWriter(schema, vector, writers);
+    } else {
+      assert vector == null;
+      mapWriter = new DummyMapWriter(schema, writers);
+    }
+    return new TupleObjectWriter(mapWriter);
+  }
+
+  public static ArrayObjectWriter buildMapArray(ColumnMetadata schema,
+      UInt4Vector offsetVector,
+      List<AbstractObjectWriter> writers) {
+    MapWriter mapWriter;
+    if (schema.isProjected()) {
+      mapWriter = new ArrayMapWriter(schema, writers);
+    } else {
+      mapWriter = new DummyArrayMapWriter(schema, writers);
+    }
+    TupleObjectWriter mapArray = new TupleObjectWriter(mapWriter);
+    AbstractArrayWriter arrayWriter;
+    if (schema.isProjected()) {
+      arrayWriter = new ObjectArrayWriter(schema,
+          offsetVector,
+          mapArray);
+    } else  {
+      arrayWriter = new DummyArrayWriter(schema, mapArray);
+    }
+    return new ArrayObjectWriter(arrayWriter);
+  }
+
+  public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema,
+      AbstractMapVector vector,
+      List<AbstractObjectWriter> writers) {
+    if (schema.isArray()) {
+      return MapWriter.buildMapArray(schema,
+          vector == null ? null :
+          ((RepeatedMapVector) vector).getOffsetVector(), writers);
+    } else {
+      return MapWriter.buildMap(schema, (MapVector) vector, writers);
+    }
+  }
+
+  public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, 
AbstractMapVector vector) {
+    assert schema.mapSchema().size() == 0;
+    return buildMapWriter(schema, vector, new 
ArrayList<AbstractObjectWriter>());
+  }
+
+  @Override
+  public ColumnMetadata schema() { return mapColumnSchema; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index c5f7982..75ef57f 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -70,15 +70,16 @@ public class NullableScalarWriter extends 
AbstractScalarWriter {
   private final BaseScalarWriter baseWriter;
   private ColumnWriterIndex writerIndex;
 
-  public NullableScalarWriter(NullableVector nullableVector, BaseScalarWriter 
baseWriter) {
+  public NullableScalarWriter(ColumnMetadata schema, NullableVector 
nullableVector, BaseScalarWriter baseWriter) {
+    this.schema = schema;
     isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector());
     this.baseWriter = baseWriter;
   }
 
   public static ScalarObjectWriter build(ColumnMetadata schema,
       NullableVector nullableVector, BaseScalarWriter baseWriter) {
-    return new ScalarObjectWriter(schema,
-        new NullableScalarWriter(nullableVector, baseWriter));
+    return new ScalarObjectWriter(
+        new NullableScalarWriter(schema, nullableVector, baseWriter));
   }
 
   public BaseScalarWriter bitsWriter() { return isSetWriter; }
@@ -98,7 +99,9 @@ public class NullableScalarWriter extends 
AbstractScalarWriter {
   }
 
   @Override
-  public ColumnWriterIndex writerIndex() { return baseWriter.writerIndex(); }
+  public int rowStartIndex() {
+    return baseWriter.rowStartIndex();
+  }
 
   @Override
   public ValueType valueType() {
@@ -112,6 +115,9 @@ public class NullableScalarWriter extends 
AbstractScalarWriter {
   }
 
   @Override
+  public boolean nullable() { return true; }
+
+  @Override
   public void setNull() {
     isSetWriter.setInt(0);
     baseWriter.skipNulls();
@@ -212,7 +218,7 @@ public class NullableScalarWriter extends 
AbstractScalarWriter {
   public void endArrayValue() {
     // Skip calls for performance: they do nothing for
     // scalar writers -- the only kind supported here.
-//    isSetWriter.saveValue();
+//    isSetWriter.endArrayValue();
     baseWriter.endArrayValue();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java
index 3554a3b..e2a1fc3 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java
@@ -17,8 +17,10 @@
  */
 package org.apache.drill.exec.vector.accessor.writer;
 
+import java.lang.reflect.Array;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.UInt4Vector;
-import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import 
org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter;
 
 /**
@@ -39,7 +41,7 @@ import 
org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArra
  * with a map, then we have a single offset vector pointing into a group of
  * arrays. Consider the simple case of a map of three scalars. Here, we have
  * a hybrid of the states discussed for the {@link BaseScalarWriter} and those
- * discussed for {@link OffsetVectorWriter}. That is, the offset vector
+ * discussed for {@link OffsetVectorWriterImpl}. That is, the offset vector
  * points into one map element. The individual elements can we Behind,
  * Written or Unwritten, depending on the specific actions taken by the
  * client.
@@ -100,19 +102,15 @@ import 
org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArra
  * The key reason to understand this flow is to understand what happens
  * in vector overflow: unlike an array of scalars, in which the data
  * vector can never be in the Behind state, when we have an array of
- * maps then each vector can be in an of the scalar writer state.
+ * maps then each vector can be in any of the scalar writer states.
  */
 
 public class ObjectArrayWriter extends BaseArrayWriter {
 
-  protected ObjectArrayWriter(UInt4Vector offsetVector, AbstractObjectWriter 
elementWriter) {
-    super(offsetVector, elementWriter);
-  }
-
-  @Override
-  public void bindIndex(ColumnWriterIndex index) {
+  protected ObjectArrayWriter(ColumnMetadata schema,
+      UInt4Vector offsetVector, AbstractObjectWriter elementWriter) {
+    super(schema, offsetVector, elementWriter);
     elementIndex = new ArrayElementWriterIndex();
-    super.bindIndex(index);
   }
 
   @Override
@@ -122,22 +120,20 @@ public class ObjectArrayWriter extends BaseArrayWriter {
   }
 
   @Override
-  public void set(Object... values) {
-    setObject(values);
-  }
-
-  @Override
   public void setObject(Object array) {
-    Object values[] = (Object[]) array;
-    for (int i = 0; i < values.length; i++) {
-      elementObjWriter.set(values[i]);
+
+    // Null array = 0-length array
+
+    if (array == null) {
+      return;
+    }
+    int size = Array.getLength(array);
+    for (int i = 0; i < size; i++) {
+      Object value = Array.get(array, i);
+      if (value != null) {
+        elementObjWriter.setObject(value);
+      }
       save();
     }
   }
-
-  @Override
-  public int lastWriteIndex() {
-    // Undefined for arrays
-    return 0;
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
index 49d16a3..d733b4f 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
@@ -17,279 +17,16 @@
  */
 package org.apache.drill.exec.vector.accessor.writer;
 
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.UInt4Vector;
-import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 
 /**
- * Specialized column writer for the (hidden) offset vector used
- * with variable-length or repeated vectors. See comments in the
- * <tt>ColumnAccessors.java</tt> template file for more details.
- * <p>
- * Note that the <tt>lastWriteIndex</tt> tracked here corresponds
- * to the data values; it is one less than the actual offset vector
- * last write index due to the nature of offset vector layouts. The selection
- * of last write index basis makes roll-over processing easier as only this
- * writer need know about the +1 translation required for writing.
- * <p>
- * The states illustrated in the base class apply here as well,
- * remembering that the end offset for a row (or array position)
- * is written one ahead of the vector index.
- * <p>
- * The vector index does create an interesting dynamic for the child
- * writers. From the child writer's perspective, the states described in
- * the super class are the only states of interest. Here we want to
- * take the perspective of the parent.
- * <p>
- * The offset vector is an implementation of a repeat level. A repeat
- * level can occur for a single array, or for a collection of columns
- * within a repeated map. (A repeat level also occurs for variable-width
- * fields, but this is a bit harder to see, so let's ignore that for
- * now.)
- * <p>
- * The key point to realize is that each repeat level introduces an
- * isolation level in terms of indexing. That is, empty values in the
- * outer level have no affect on indexing in the inner level. In fact,
- * the nature of a repeated outer level means that there are no empties
- * in the inner level.
- * <p>
- * To illustrate:<pre><code>
- *       Offset Vector          Data Vector   Indexes
- *  lw, v > | 10 |   - - - - - >   | X |        10
- *          | 12 |   - - +         | X | < lw'  11
- *          |    |       + - - >   |   | < v'   12
- * </code></pre>
- * In the above, the client has just written an array of two elements
- * at the current write position. The data starts at offset 10 in
- * the data vector, and the next write will be at 12. The end offset
- * is written one ahead of the vector index.
- * <p>
- * From the data vector's perspective, its last-write (lw') reflects
- * the last element written. If this is an array of scalars, then the
- * write index is automatically incremented, as illustrated by v'.
- * (For map arrays, the index must be incremented by calling
- * <tt>save()</tt> on the map array writer.)
- * <p>
- * Suppose the client now skips some arrays:<pre><code>
- *       Offset Vector          Data Vector
- *     lw > | 10 |   - - - - - >   | X |        10
- *          | 12 |   - - +         | X | < lw'  11
- *          |    |       + - - >   |   | < v'   12
- *          |    |                 |   |        13
- *      v > |    |                 |   |        14
- * </code></pre>
- * The last write position does not move and there are gaps in the
- * offset vector. The vector index points to the current row. Note
- * that the data vector last write and vector indexes do not change,
- * this reflects the fact that the the data vector's vector index
- * (v') matches the tail offset
- * <p>
- * The
- * client now writes a three-element vector:<pre><code>
- *       Offset Vector          Data Vector
- *          | 10 |   - - - - - >   | X |        10
- *          | 12 |   - - +         | X |        11
- *          | 12 |   - - + - - >   | Y |        12
- *          | 12 |   - - +         | Y |        13
- *  lw, v > | 12 |   - - +         | Y | < lw'  14
- *          | 15 |   - - - - - >   |   | < v'   15
- * </code></pre>
- * Quite a bit just happened. The empty offset slots were back-filled
- * with the last write offset in the data vector. The client wrote
- * three values, which advanced the last write and vector indexes
- * in the data vector. And, the last write index in the offset
- * vector also moved to reflect the update of the offset vector.
- * Note that as a result, multiple positions in the offset vector
- * point to the same location in the data vector. This is fine; we
- * compute the number of entries as the difference between two successive
- * offset vector positions, so the empty positions have become 0-length
- * arrays.
- * <p>
- * Note that, for an array of scalars, when overflow occurs,
- * we need only worry about two
- * states in the data vector. Either data has been written for the
- * row (as in the third example above), and so must be moved to the
- * roll-over vector, or no data has been written and no move is
- * needed. We never have to worry about missing values because the
- * cannot occur in the data vector.
- * <p>
- * See {@link ObjectArrayWriter} for information about arrays of
- * maps (arrays of multiple columns.)
+ * Interface for specialized operations on an offset vector.
  */
 
-public class OffsetVectorWriter extends AbstractFixedWidthWriter {
-
-  private static final int VALUE_WIDTH = UInt4Vector.VALUE_WIDTH;
-
-  private UInt4Vector vector;
-
-  /**
-   * Offset of the first value for the current row. Used during
-   * overflow or if the row is restarted.
-   */
-
-  private int rowStartOffset;
-
-  /**
-   * Cached value of the end offset for the current value. Used
-   * primarily for variable-width columns to allow the column to be
-   * rewritten multiple times within the same row. The start offset
-   * value is updated with the end offset only when the value is
-   * committed in {@link @endValue()}.
-   */
-
-  private int nextOffset;
-
-  public OffsetVectorWriter(UInt4Vector vector) {
-    this.vector = vector;
-  }
-
-  @Override public BaseDataValueVector vector() { return vector; }
-  @Override public int width() { return VALUE_WIDTH; }
-
-  @Override
-  protected void realloc(int size) {
-    vector.reallocRaw(size);
-    setBuffer();
-  }
-
-  @Override
-  public ValueType valueType() { return ValueType.INTEGER; }
-
-  @Override
-  public void startWrite() {
-    super.startWrite();
-    nextOffset = 0;
-    rowStartOffset = 0;
-
-    // Special handling for first value. Alloc vector if needed.
-    // Offset vectors require a 0 at position 0. The (end) offset
-    // for row 0 starts at position 1, which is handled in
-    // writeOffset() below.
-
-    if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) {
-      realloc(MIN_BUFFER_SIZE);
-    }
-    vector.getBuffer().setInt(0, 0);
-  }
-
-  public int nextOffset() { return nextOffset; }
-  public int rowStartOffset() { return rowStartOffset; }
-
-  @Override
-  public void startRow() { rowStartOffset = nextOffset; }
-
-  /**
-   * Return the write offset, which is one greater than the index reported
-   * by the vector index.
-   *
-   * @return the offset in which to write the current offset of the end
-   * of the current data value
-   */
-
-  protected final int writeIndex() {
-
-    // "Fast path" for the normal case of no fills, no overflow.
-    // This is the only bounds check we want to do for the entire
-    // set operation.
-
-    // This is performance critical code; every operation counts.
-    // Please be thoughtful when changing the code.
-
-    final int valueIndex = vectorIndex.vectorIndex();
-    int writeIndex = valueIndex + 1;
-    if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) {
-      writeIndex = prepareWrite(writeIndex);
-    }
-
-    // Track the last write location for zero-fill use next time around.
-    // Recall, it is the value index, which is one less than the (end)
-    // offset index.
-
-    lastWriteIndex = valueIndex;
-    return writeIndex;
-  }
-
-  protected int prepareWrite(int writeIndex) {
-
-    // Either empties must be filled or the vector is full.
-
-    resize(writeIndex);
-
-    // Call to resize may cause rollover, so reset write index
-    // afterwards.
-
-    final int valueIndex = vectorIndex.vectorIndex();
-
-    // Fill empties to the write position.
-    // Fill empties works off the row index, not the write
-    // index. The write index is one past the row index.
-    // (Yes, this is complex...)
-
-    fillEmpties(valueIndex);
-    return valueIndex + 1;
-  }
-
-  @Override
-  protected final void fillEmpties(final int valueIndex) {
-    while (lastWriteIndex < valueIndex - 1) {
-      drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
-    }
-  }
-
-  public final void setNextOffset(final int newOffset) {
-    final int writeIndex = writeIndex();
-    drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
-    nextOffset = newOffset;
-  }
-
-  @Override
-  public void skipNulls() {
-
-    // Nothing to do. Fill empties logic will fill in missing
-    // offsets.
-  }
-
-  @Override
-  public void restartRow() {
-    nextOffset = rowStartOffset;
-    super.restartRow();
-  }
-
-  @Override
-  public void preRollover() {
-    setValueCount(vectorIndex.rowStartIndex() + 1);
-  }
-
-  @Override
-  public void postRollover() {
-    final int newNext = nextOffset - rowStartOffset;
-    super.postRollover();
-    nextOffset = newNext;
-  }
-
-  @Override
-  public void setValueCount(int valueCount) {
-
-    // Slightly different version of the fixed-width writer
-    // code. Offset counts are one greater than the value count.
-    // But for all other purposes, we track the value (row)
-    // position, not the offset position.
-
-    mandatoryResize(valueCount);
-    fillEmpties(valueCount);
-    vector().getBuffer().writerIndex((valueCount + 1) * width());
-    lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1);
-  }
-
-  @Override
-  public void dump(HierarchicalFormatter format) {
-    format.extend();
-    super.dump(format);
-    format
-      .attribute("lastWriteIndex", lastWriteIndex)
-      .attribute("nextOffset", nextOffset)
-      .endObject();
-  }
+public interface OffsetVectorWriter extends ScalarWriter, WriterEvents {
+  int rowStartOffset();
+  int nextOffset();
+  void setNextOffset(int vectorIndex);
+  void dump(HierarchicalFormatter format);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
new file mode 100644
index 0000000..edcb9f5
--- /dev/null
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Specialized column writer for the (hidden) offset vector used
+ * with variable-length or repeated vectors. See comments in the
+ * <tt>ColumnAccessors.java</tt> template file for more details.
+ * <p>
+ * Note that the <tt>lastWriteIndex</tt> tracked here corresponds
+ * to the data values; it is one less than the actual offset vector
+ * last write index due to the nature of offset vector layouts. The selection
+ * of last write index basis makes roll-over processing easier as only this
+ * writer need know about the +1 translation required for writing.
+ * <p>
+ * The states illustrated in the base class apply here as well,
+ * remembering that the end offset for a row (or array position)
+ * is written one ahead of the vector index.
+ * <p>
+ * The vector index does create an interesting dynamic for the child
+ * writers. From the child writer's perspective, the states described in
+ * the super class are the only states of interest. Here we want to
+ * take the perspective of the parent.
+ * <p>
+ * The offset vector is an implementation of a repeat level. A repeat
+ * level can occur for a single array, or for a collection of columns
+ * within a repeated map. (A repeat level also occurs for variable-width
+ * fields, but this is a bit harder to see, so let's ignore that for
+ * now.)
+ * <p>
+ * The key point to realize is that each repeat level introduces an
+ * isolation level in terms of indexing. That is, empty values in the
+ * outer level have no affect on indexing in the inner level. In fact,
+ * the nature of a repeated outer level means that there are no empties
+ * in the inner level.
+ * <p>
+ * To illustrate:<pre><code>
+ *       Offset Vector          Data Vector   Indexes
+ *  lw, v > | 10 |   - - - - - >   | X |        10
+ *          | 12 |   - - +         | X | < lw'  11
+ *          |    |       + - - >   |   | < v'   12
+ * </code></pre>
+ * In the above, the client has just written an array of two elements
+ * at the current write position. The data starts at offset 10 in
+ * the data vector, and the next write will be at 12. The end offset
+ * is written one ahead of the vector index.
+ * <p>
+ * From the data vector's perspective, its last-write (lw') reflects
+ * the last element written. If this is an array of scalars, then the
+ * write index is automatically incremented, as illustrated by v'.
+ * (For map arrays, the index must be incremented by calling
+ * <tt>save()</tt> on the map array writer.)
+ * <p>
+ * Suppose the client now skips some arrays:<pre><code>
+ *       Offset Vector          Data Vector
+ *     lw > | 10 |   - - - - - >   | X |        10
+ *          | 12 |   - - +         | X | < lw'  11
+ *          |    |       + - - >   |   | < v'   12
+ *          |    |                 |   |        13
+ *      v > |    |                 |   |        14
+ * </code></pre>
+ * The last write position does not move and there are gaps in the
+ * offset vector. The vector index points to the current row. Note
+ * that the data vector last write and vector indexes do not change,
+ * this reflects the fact that the the data vector's vector index
+ * (v') matches the tail offset
+ * <p>
+ * The
+ * client now writes a three-element vector:<pre><code>
+ *       Offset Vector          Data Vector
+ *          | 10 |   - - - - - >   | X |        10
+ *          | 12 |   - - +         | X |        11
+ *          | 12 |   - - + - - >   | Y |        12
+ *          | 12 |   - - +         | Y |        13
+ *  lw, v > | 12 |   - - +         | Y | < lw'  14
+ *          | 15 |   - - - - - >   |   | < v'   15
+ * </code></pre>
+ * Quite a bit just happened. The empty offset slots were back-filled
+ * with the last write offset in the data vector. The client wrote
+ * three values, which advanced the last write and vector indexes
+ * in the data vector. And, the last write index in the offset
+ * vector also moved to reflect the update of the offset vector.
+ * Note that as a result, multiple positions in the offset vector
+ * point to the same location in the data vector. This is fine; we
+ * compute the number of entries as the difference between two successive
+ * offset vector positions, so the empty positions have become 0-length
+ * arrays.
+ * <p>
+ * Note that, for an array of scalars, when overflow occurs,
+ * we need only worry about two
+ * states in the data vector. Either data has been written for the
+ * row (as in the third example above), and so must be moved to the
+ * roll-over vector, or no data has been written and no move is
+ * needed. We never have to worry about missing values because the
+ * cannot occur in the data vector.
+ * <p>
+ * See {@link ObjectArrayWriter} for information about arrays of
+ * maps (arrays of multiple columns.)
+ */
+
+public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter 
implements OffsetVectorWriter {
+
+  private static final int VALUE_WIDTH = UInt4Vector.VALUE_WIDTH;
+
+  private final UInt4Vector vector;
+
+  /**
+   * Offset of the first value for the current row. Used during
+   * overflow or if the row is restarted.
+   */
+
+  private int rowStartOffset;
+
+  /**
+   * Cached value of the end offset for the current value. Used
+   * primarily for variable-width columns to allow the column to be
+   * rewritten multiple times within the same row. The start offset
+   * value is updated with the end offset only when the value is
+   * committed in {@link @endValue()}.
+   */
+
+  private int nextOffset;
+
+  public OffsetVectorWriterImpl(UInt4Vector vector) {
+    this.vector = vector;
+  }
+
+  @Override public BaseDataValueVector vector() { return vector; }
+  @Override public int width() { return VALUE_WIDTH; }
+
+  @Override
+  protected void realloc(int size) {
+    vector.reallocRaw(size);
+    setBuffer();
+  }
+
+  @Override
+  public ValueType valueType() { return ValueType.INTEGER; }
+
+  @Override
+  public void startWrite() {
+    super.startWrite();
+    nextOffset = 0;
+    rowStartOffset = 0;
+
+    // Special handling for first value. Alloc vector if needed.
+    // Offset vectors require a 0 at position 0. The (end) offset
+    // for row 0 starts at position 1, which is handled in
+    // writeOffset() below.
+
+    if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) {
+      realloc(MIN_BUFFER_SIZE);
+    }
+    drillBuf.setInt(0, 0);
+  }
+
+  @Override
+  public int nextOffset() { return nextOffset; }
+
+  @Override
+  public int rowStartOffset() { return rowStartOffset; }
+
+  @Override
+  public void startRow() { rowStartOffset = nextOffset; }
+
+  /**
+   * Return the write offset, which is one greater than the index reported
+   * by the vector index.
+   *
+   * @return the offset in which to write the current offset of the end
+   * of the current data value
+   */
+
+  protected final int prepareWrite() {
+
+    // "Fast path" for the normal case of no fills, no overflow.
+    // This is the only bounds check we want to do for the entire
+    // set operation.
+
+    // This is performance critical code; every operation counts.
+    // Please be thoughtful when changing the code.
+
+    final int valueIndex = vectorIndex.vectorIndex();
+    int writeIndex = valueIndex + 1;
+    if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) {
+      writeIndex =  prepareWrite(writeIndex);
+    }
+
+    // Track the last write location for zero-fill use next time around.
+
+    lastWriteIndex = valueIndex;
+    return writeIndex;
+  }
+
+  protected int prepareWrite(int writeIndex) {
+
+    // Either empties must be filed or the vector is full.
+
+    resize(writeIndex);
+
+    // Call to resize may cause rollover, so reset write index
+    // afterwards.
+
+    final int valueIndex = vectorIndex.vectorIndex();
+
+    // Fill empties to the write position.
+    // Fill empties works of the row index, not the write
+    // index. (Yes, this is complex...)
+
+    fillEmpties(valueIndex);
+    return valueIndex + 1;
+  }
+
+  @Override
+  protected final void fillEmpties(final int valueIndex) {
+    while (lastWriteIndex < valueIndex - 1) {
+      drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
+    }
+  }
+
+  @Override
+  public final void setNextOffset(final int newOffset) {
+    final int writeIndex = prepareWrite();
+    drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
+    nextOffset = newOffset;
+  }
+
+  @Override
+  public void skipNulls() {
+
+    // Nothing to do. Fill empties logic will fill in missing
+    // offsets.
+  }
+
+  @Override
+  public void restartRow() {
+    nextOffset = rowStartOffset;
+    super.restartRow();
+  }
+
+  @Override
+  public void preRollover() {
+
+    // Rollover is occurring. This means the current row is not complete.
+    // We want to keep 0..(row index - 1) which gives us (row index)
+    // rows. But, this being an offset vector, we add one to account
+    // for the extra 0 value at the start.
+
+    setValueCount(vectorIndex.rowStartIndex() + 1);
+  }
+
+  @Override
+  public void postRollover() {
+    final int newNext = nextOffset - rowStartOffset;
+    super.postRollover();
+    nextOffset = newNext;
+  }
+
+  @Override
+  public void setValueCount(int valueCount) {
+    mandatoryResize(valueCount);
+
+    // Value count are in offset vector positions. Fill empties
+    // works in row positions.
+
+    fillEmpties(valueCount);
+    vector().getBuffer().writerIndex((valueCount + 1) * VALUE_WIDTH);
+  }
+
+  @Override
+  public void dump(HierarchicalFormatter format) {
+    format.extend();
+    super.dump(format);
+    format
+      .attribute("lastWriteIndex", lastWriteIndex)
+      .attribute("nextOffset", nextOffset)
+      .endObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
index 8382ad1..2ac7d45 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.vector.accessor.writer;
 
+import java.lang.reflect.Array;
 import java.math.BigDecimal;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
 import 
org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter;
 import 
org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -64,14 +64,14 @@ public class ScalarArrayWriter extends BaseArrayWriter {
 
   public ScalarArrayWriter(ColumnMetadata schema,
       RepeatedValueVector vector, BaseScalarWriter elementWriter) {
-    super(vector.getOffsetVector(),
-        new ScalarObjectWriter(schema, elementWriter));
+    super(schema, vector.getOffsetVector(),
+        new ScalarObjectWriter(elementWriter));
     this.elementWriter = elementWriter;
   }
 
   public static ArrayObjectWriter build(ColumnMetadata schema,
       RepeatedValueVector repeatedVector, BaseScalarWriter elementWriter) {
-    return new ArrayObjectWriter(schema,
+    return new ArrayObjectWriter(
         new ScalarArrayWriter(schema, repeatedVector, elementWriter));
   }
 
@@ -83,25 +83,28 @@ public class ScalarArrayWriter extends BaseArrayWriter {
   }
 
   @Override
-  public void bindListener(ColumnWriterListener listener) {
-    elementWriter.bindListener(listener);
-  }
-
-  @Override
   public void save() {
     // No-op: done when writing each scalar value
+    // May be called, however, by code that also supports
+    // lists as a list does require an explicit save.
   }
 
-  @Override
-  public void set(Object... values) {
-    for (Object value : values) {
-      entry().set(value);
-    }
-  }
+  /**
+   * Set a repeated vector based on a Java array of the proper
+   * type. This function involves parsing the array type and so is
+   * suitable only for test code. The array can be either a primitive
+   * (<tt>int [], say</tt>) or a typed array of boxed values
+   * (<tt>Integer[], say</tt>).
+   */
 
   @Override
   public void setObject(Object array) {
-    if (array == null) {
+
+    // Accept an empty array (of any type) to mean
+    // an empty array of the type of this writer.
+
+    if (array == null || Array.getLength(array) == 0) {
+
       // Assume null means a 0-element array since Drill does
       // not support null for the whole array.
 
@@ -109,7 +112,9 @@ public class ScalarArrayWriter extends BaseArrayWriter {
     }
     String objClass = array.getClass().getName();
     if (! objClass.startsWith("[")) {
-      throw new IllegalArgumentException("Argument must be an array");
+      throw new IllegalArgumentException(
+          String.format("Argument must be an array. Column `%s`, value = %s",
+              schema.name(), array.toString()));
     }
 
     // Figure out type
@@ -125,39 +130,50 @@ public class ScalarArrayWriter extends BaseArrayWriter {
         setBytesArray((byte[][]) array);
         break;
       default:
-        throw new IllegalArgumentException( "Unknown Java array type: " + 
objClass );
+        throw new IllegalArgumentException(
+            String.format("Unknown Java array type: %s, for column `%s`",
+                objClass, schema.name()));
       }
       break;
+    case  'B':
+      setByteArray((byte[]) array);
+      break;
     case  'S':
-      setShortArray((short[]) array );
+      setShortArray((short[]) array);
       break;
     case  'I':
-      setIntArray((int[]) array );
+      setIntArray((int[]) array);
       break;
     case  'J':
-      setLongArray((long[]) array );
+      setLongArray((long[]) array);
       break;
     case  'F':
-      setFloatArray((float[]) array );
+      setFloatArray((float[]) array);
       break;
     case  'D':
-      setDoubleArray((double[]) array );
+      setDoubleArray((double[]) array);
       break;
     case  'Z':
-      setBooleanArray((boolean[]) array );
+      setBooleanArray((boolean[]) array);
       break;
     case 'L':
       int posn = objClass.indexOf(';');
 
       // If the array is of type Object, then we have no type info.
 
-      String memberClassName = objClass.substring( 2, posn );
+      String memberClassName = objClass.substring(2, posn);
       if (memberClassName.equals(String.class.getName())) {
-        setStringArray((String[]) array );
+        setStringArray((String[]) array);
       } else if (memberClassName.equals(Period.class.getName())) {
-        setPeriodArray((Period[]) array );
+        setPeriodArray((Period[]) array);
       } else if (memberClassName.equals(BigDecimal.class.getName())) {
-        setBigDecimalArray((BigDecimal[]) array );
+        setBigDecimalArray((BigDecimal[]) array);
+      } else if (memberClassName.equals(Integer.class.getName())) {
+        setIntObjectArray((Integer[]) array);
+      } else if (memberClassName.equals(Long.class.getName())) {
+        setLongObjectArray((Long[]) array);
+      } else if (memberClassName.equals(Double.class.getName())) {
+        setDoubleObjectArray((Double[]) array);
       } else {
         throw new IllegalArgumentException( "Unknown Java array type: " + 
memberClassName );
       }
@@ -179,6 +195,12 @@ public class ScalarArrayWriter extends BaseArrayWriter {
     }
   }
 
+  public void setByteArray(byte[] value) {
+    for (int i = 0; i < value.length; i++) {
+      elementWriter.setInt(value[i]);
+    }
+  }
+
   public void setShortArray(short[] value) {
     for (int i = 0; i < value.length; i++) {
       elementWriter.setInt(value[i]);
@@ -191,12 +213,34 @@ public class ScalarArrayWriter extends BaseArrayWriter {
     }
   }
 
+  public void setIntObjectArray(Integer[] value) {
+    for (int i = 0; i < value.length; i++) {
+      Integer element = value[i];
+      if (element == null) {
+        elementWriter.setNull();
+      } else {
+        elementWriter.setInt(element);
+      }
+    }
+  }
+
   public void setLongArray(long[] value) {
     for (int i = 0; i < value.length; i++) {
       elementWriter.setLong(value[i]);
     }
   }
 
+  public void setLongObjectArray(Long[] value) {
+    for (int i = 0; i < value.length; i++) {
+      Long element = value[i];
+      if (element == null) {
+        elementWriter.setNull();
+      } else {
+        elementWriter.setLong(element);
+      }
+    }
+  }
+
   public void setFloatArray(float[] value) {
     for (int i = 0; i < value.length; i++) {
       elementWriter.setDouble(value[i]);
@@ -209,6 +253,17 @@ public class ScalarArrayWriter extends BaseArrayWriter {
     }
   }
 
+  public void setDoubleObjectArray(Double[] value) {
+    for (int i = 0; i < value.length; i++) {
+      Double element = value[i];
+      if (element == null) {
+        elementWriter.setNull();
+      } else {
+        elementWriter.setDouble(element);
+      }
+    }
+  }
+
   public void setStringArray(String[] value) {
     for (int i = 0; i < value.length; i++) {
       elementWriter.setString(value[i]);

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
index 7566f28..c8f9f48 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
@@ -40,6 +40,38 @@ import 
org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 public interface WriterEvents {
 
   /**
+   * Tracks the write state of a tuple or variant to allow applying the correct
+   * operations to newly-added columns to synchronize them with the rest
+   * of the writers.
+   */
+
+  enum State {
+
+    /**
+     * No write is in progress. Nothing need be done to newly-added
+     * writers.
+     */
+    IDLE,
+
+    /**
+     * <tt>startWrite()</tt> has been called to start a write operation
+     * (start a batch), but <tt>startValue()</tt> has not yet been called
+     * to start a row (or value within an array). <tt>startWrite()</tt> must
+     * be called on newly added columns.
+     */
+
+    IN_WRITE,
+
+    /**
+     * Both <tt>startWrite()</tt> and <tt>startValue()</tt> has been called on
+     * the tuple to prepare for writing values, and both must be called on
+     * newly-added vectors.
+     */
+
+    IN_ROW
+  }
+
+  /**
    * Bind the writer to a writer index.
    *
    * @param index the writer index (top level or nested for
@@ -48,8 +80,6 @@ public interface WriterEvents {
 
   void bindIndex(ColumnWriterIndex index);
 
-  ColumnWriterIndex writerIndex();
-
   /**
    * Start a write (batch) operation. Performs any vector initialization
    * required at the start of a batch (especially for offset vectors.)
@@ -113,15 +143,4 @@ public interface WriterEvents {
    */
 
   void postRollover();
-
-  /**
-   * Return the last write position in the vector. This may be the
-   * same as the writer index position (if the vector was written at
-   * that point), or an earlier point. In either case, this value
-   * points to the last valid value in the vector.
-   *
-   * @return index of the last valid value in the vector
-   */
-
-  int lastWriteIndex();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
index 7c9f8ba..3d64bb6 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
@@ -17,9 +17,8 @@
  */
 package org.apache.drill.exec.vector.accessor.writer.dummy;
 
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
-import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
@@ -35,30 +34,37 @@ import 
org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
  */
 public class DummyArrayWriter extends AbstractArrayWriter {
 
+  public static class DummyOffsetVectorWriter extends DummyScalarWriter 
implements OffsetVectorWriter {
+
+    public DummyOffsetVectorWriter() {
+      super(null);
+    }
+
+    @Override
+    public int rowStartOffset() { return 0; }
+
+    @Override
+    public int nextOffset() { return 0; }
+
+    @Override
+    public void setNextOffset(int vectorIndex) { }
+  }
+
+  public static final DummyOffsetVectorWriter offsetVectorWriter = new 
DummyOffsetVectorWriter();
+
   public DummyArrayWriter(
+      ColumnMetadata schema,
       AbstractObjectWriter elementWriter) {
-    super(elementWriter);
+    super(schema, elementWriter, offsetVectorWriter);
   }
 
   @Override
-  public int size() { return 0; }
-
-  @Override
   public void save() { }
 
   @Override
-  public void set(Object... values) { }
-
-  @Override
   public void setObject(Object array) { }
 
   @Override
-  public void bindIndex(ColumnWriterIndex index) { }
-
-  @Override
-  public ColumnWriterIndex writerIndex() { return null; }
-
-  @Override
   public void startWrite() { }
 
   @Override
@@ -86,11 +92,5 @@ public class DummyArrayWriter extends AbstractArrayWriter {
   public int lastWriteIndex() { return 0; }
 
   @Override
-  public void bindListener(ColumnWriterListener listener) { }
-
-  @Override
-  public void bindListener(TupleWriterListener listener) { }
-
-  @Override
-  public OffsetVectorWriter offsetWriter() { return null; }
+  public void bindIndex(ColumnWriterIndex index) { }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
index e8272d6..9cfe56e 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.vector.accessor.writer.dummy;
 
 import java.math.BigDecimal;
 
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ValueType;
@@ -33,11 +34,18 @@ import org.joda.time.Period;
 
 public class DummyScalarWriter extends AbstractScalarWriter {
 
+  public DummyScalarWriter(ColumnMetadata schema) {
+   this.schema = schema;
+  }
+
   @Override
   public void bindListener(ColumnWriterListener listener) { }
 
   @Override
-  public ValueType valueType() { return null; }
+  public ValueType valueType() { return ValueType.NULL; }
+
+  @Override
+  public boolean nullable() { return true; }
 
   @Override
   public void setNull() { }
@@ -67,9 +75,6 @@ public class DummyScalarWriter extends AbstractScalarWriter {
   public void bindIndex(ColumnWriterIndex index) { }
 
   @Override
-  public ColumnWriterIndex writerIndex() { return null; }
-
-  @Override
   public void restartRow() { }
 
   @Override
@@ -86,4 +91,7 @@ public class DummyScalarWriter extends AbstractScalarWriter {
 
   @Override
   public BaseDataValueVector vector() { return null; }
+
+  @Override
+  public int rowStartIndex() { return 0; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dbff1646/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java
index 5e827fa..9efc59c 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java
@@ -104,11 +104,18 @@
  * </ul>
  * <p>
  * This data model is similar to; but has important differences from, the 
prior,
- * generated, readers and writers.
+ * generated, readers and writers. This version is based on the concept of
+ * minimizing the number of writer classes, and leveraging Java primitives to
+ * keep the number of get/set methods to a reasonable size. This version also
+ * automates vector allocation, vector overflow and so on.
  * <p>
  * The object layer is new: it is the simplest way to model the three "object
  * types." An app using this code would use just the leaf scalar readers and
  * writers.
+ * <p>
+ * Similarly, the {@link ColumnWriter} interface provides a uniform way to
+ * access services common to all writer types, while allowing each JSON-like
+ * writer to provide type-specific ways to access data.
  *
  * <h4>Writer Performance</h4>
  *
@@ -145,6 +152,94 @@
  * vector and works the same for repeated scalars, repeated maps and
  * (eventually) lists and repeated lists.</li>
  * </ul>
+ *
+ * <h4>Possible Improvements</h4>
+ *
+ * The code here works and has extensive unit tests. But, many improvements
+ * are possible:
+ * <ul>
+ * <li>Drill has four "container" vector types (Map, Union, List, Repeated
+ * List), each with its own quirky semantics. The code could be far simpler
+ * if the container semantics were unified.</li>
+ * <li>Similarly, the corresponding writers implement some very awkward
+ * logic to handle union and list containers. But, these vectors are not
+ * fully supported in Drill. This means that the code implements (and tests)
+ * many odd cases which no one may ever use. Better to limit the data types
+ * that Drill supports, implement those well, and deprecate the obscure
+ * cases.</li>
+ * <li>The same schema-parsing logic appears over and over in different
+ * guises. Some parse vectors, some parse batch schemas, others parse the
+ * column metadata (which provides information not available in the
+ * materialized field) and so on. Would be great to find some common way
+ * to do this work, perhaps in the form of a visitor. An earlier version of
+ * this code tried using visitors. But, since each representation has its
+ * own quirks, that approach didn't work out. A first step would be to come
+ * up with a standard schema description which can be used in all cases,
+ * then build a visitor on that.</li>
+ * <li>Many tests exist. But, the careful reader will note that Drill's
+ * vectors define a potentially very complex recursive structure (most
+ * variations of which are never used.) Additional testing should cover
+ * all cases, such as repeated lists that contain unions, or unions that
+ * contain repeated lists of tuples.</li>
+ * <li>Experience with the code may show additional redundancies that can
+ * be removed. Each fresh set of eyes may see things that prior folks
+ * missed.</li>
+ * </ul>
+ *
+ * <h4>Caveats</h4>
+ *
+ * The column accessors are divided into two packages: <tt>vector</tt> and
+ * <tt>java-exec</tt>. It is easy to add functionality in the wrong place,
+ * breaking abstraction and encapsulation. Here are some general guidelines:
+ * <ul>
+ * <li>The core reader and writer logic is implemented in this <tt>vector</tt>
+ * package. This package provides low-level tools to build accessors, but
+ * not the construction logic itself. (That is found in the <tt>java-exec</tt>
+ * layer.)</li>
+ * <li>The vector layer is designed to support both the simple "row set" and
+ * the more complex "result set loader" implementations.</li>
+ * <li>The "row set" layer wraps the accessors in tools that work on one batch
+ * (row set) at a time, without regard for schema versions, schema changes
+ * and the like. The row set layer is primarily for testing: building an input
+ * batch for some operator, and verifying an output batch. It also serves as a
+ * simple test framework for the accessors, without the complexity of other
+ * layers.</li>
+ * <li>The "result set loader" layer handles the complexity of dynamic schema
+ * evolution, schema versioning, vector overflow and projection. It provides
+ * an "industrial strength" version of the accessor mechanism intended for
+ * use in the scan operator (but which can be generalized for other operators.)
+ * </li>
+ * <li>The "listener" pattern is used to allow higher levels to "plug in"
+ * functionality to the accessor layer. This is especially true for schema
+ * evolution: listeners notify the higher layer to add a column, delegating
+ * the actual work to that higher layer.</li>
+ * </ul>
+ *
+ * Given all this, plan carefully where to make any improvement. If your change
+ * violates the dependencies below, perhaps reconsider another way to do the
+ * change.
+ * <code><pre>
+ *                                                  +------------+
+ *                      +-------------------------- | Result Set |
+ *                      v                           |   Loader   |
+ *             +----------------+     +---------+   +------------+
+ *             |    Metadata    | <-- | Row Set |     |
+ *             | Implementation |     |  Tools  |     |
+ *             +----------------+     +---------+     |
+ * java-exec           |                     |        |
+ * ------------------- | ------------------- | ------ | ------------
+ * vector              v                     v        v
+ *               +------------+            +-----------+
+ *               | Metadata   | <--------- |  Column   |
+ *               | Interfaces |            | Accessors |
+ *               +------------+            +-----------+
+ *                                               |
+ *                                               v
+ *                                          +---------+
+ *                                          |  Value  |
+ *                                          | Vectors |
+ *                                          +---------+
+ * </pre></code>
  */
 
 package org.apache.drill.exec.vector.accessor.writer;

Reply via email to