DRILL-1283: JSON project pushdown.

Allows for users to avoid reading columns of a JSON file, including those that 
include elements of JSON that drill does not currently support. This can be 
used to query a subset of an existing file while avoiding elements like schema 
changes in some columns or nulls in lists that are currently not compatible 
with Drill.

Patch was revised based on Hanifi's review comments, and then rebased off of 
the merge branch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/929d765a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/929d765a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/929d765a

Branch: refs/heads/master
Commit: 929d765afd9da2fb0010a97e90b2ee19f245e37c
Parents: cc25504
Author: Jason Altekruse <altekruseja...@gmail.com>
Authored: Tue Aug 5 15:44:42 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Wed Aug 27 18:38:32 2014 -0700

----------------------------------------------------------------------
 .../drill/common/expression/PathSegment.java    |    42 +
 .../drill/common/expression/SchemaPath.java     |    16 +
 .../src/main/codegen/templates/BaseWriter.java  |     4 +
 .../main/codegen/templates/ComplexWriters.java  |    13 +-
 .../codegen/templates/FixedValueVectors.java    |     2 +-
 .../src/main/codegen/templates/ListWriters.java |    10 +-
 .../src/main/codegen/templates/MapWriters.java  |    12 +-
 .../exec/expr/fn/impl/conv/JsonConvertFrom.java |     4 +-
 .../exec/store/easy/json/JSONFormatPlugin.java  |     2 +-
 .../exec/store/easy/json/JSONRecordReader2.java |    42 +-
 .../drill/exec/vector/BaseValueVector.java      |     1 +
 .../org/apache/drill/exec/vector/BitVector.java |     2 +-
 .../exec/vector/complex/fn/JsonReader.java      |   130 +-
 .../vector/complex/fn/JsonReaderWithState.java  |    14 +-
 .../vector/complex/impl/ComplexWriterImpl.java  |     9 +-
 .../complex/impl/VectorContainerWriter.java     |     8 +
 .../java/org/apache/drill/BaseTestQuery.java    |     2 +-
 .../physical/impl/writer/TestParquetWriter.java |     3 +
 .../vector/complex/writer/TestJsonReader.java   |   160 +-
 .../project_pushdown_json_physical_plan.json    |    50 +
 .../store/json/schema_change_int_to_string.json |    30 +
 .../store/json/single_column_long_file.json     | 13512 +++++++++++++++++
 .../store/json/test_complex_read_with_star.json |    24 +
 23 files changed, 14049 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/common/src/main/java/org/apache/drill/common/expression/PathSegment.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/expression/PathSegment.java 
b/common/src/main/java/org/apache/drill/common/expression/PathSegment.java
index 0ecfcd0..c434dc7 100644
--- a/common/src/main/java/org/apache/drill/common/expression/PathSegment.java
+++ b/common/src/main/java/org/apache/drill/common/expression/PathSegment.java
@@ -239,4 +239,46 @@ public abstract class PathSegment{
     } else return child.equals(other.child);
   }
 
+  /**
+   * Check if another path is contained in this one. This is useful for 2 
cases. The first
+   * is checking if the other is lower down in the tree, below this path. The 
other is if
+   * a path is actually contained above the current one.
+   *
+   * Examples:
+   * [a] . contains( [a.b.c] ) returns true
+   * [a.b.c] . contains( [a] ) returns true
+   *
+   * This behavior is used for cases like scanning json in an event based 
fashion, when we arrive at
+   * a node in a complex type, we will know the complete path back to the 
root. This method can
+   * be used to determine if we need the data below. This is true in both the 
cases where the
+   * column requested from the user is below the current node (in which case 
we may ignore other nodes
+   * further down the tree, while keeping others). This is also the case if 
the requested path is further
+   * up the tree, if we know we are at position a.b.c and a.b was a requested 
column, we need to scan
+   * all of the data at and below the current a.b.c node.
+   *
+   * @param otherSeg - path segment to check if it is contained below this one.
+   * @return - is this a match
+   */
+  public boolean contains(PathSegment otherSeg) {
+    if (this == otherSeg)
+      return true;
+    if (otherSeg == null)
+      return false;
+    // TODO - fix this in the future to match array segments are part of the 
path
+    // the current behavior to always return true when we hit an array may be 
useful in some cases,
+    // but we can get better performance in the JSON reader if we avoid 
reading unwanted elements in arrays
+    if (otherSeg.isArray() || this.isArray())
+      return true;
+    if (getClass() != otherSeg.getClass())
+      return false;
+
+    if (!segmentEquals(otherSeg)) {
+      return false;
+    }
+    else if (child == null || otherSeg.child == null) {
+      return true;
+    } else return child.contains(otherSeg.child);
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java 
b/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
index 25ee8b4..9f444e4 100644
--- a/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
+++ b/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
@@ -199,6 +199,22 @@ public class SchemaPath extends LogicalExpressionBase {
     return rootSegment.equals(other.rootSegment);
   }
 
+  public boolean contains(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof SchemaPath))
+      return false;
+
+    SchemaPath other = (SchemaPath) obj;
+    if (rootSegment == null) {
+      return true;
+    }
+    return rootSegment.contains(other.rootSegment);
+
+  }
+
   @Override
   public Iterator<LogicalExpression> iterator() {
     return Iterators.emptyIterator();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/codegen/templates/BaseWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java 
b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
index e281bc7..69ab6cd 100644
--- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
@@ -33,6 +33,10 @@ public interface BaseWriter extends Positionable{
   WriteState getState();
   
   public interface MapWriter extends BaseWriter{
+
+    MaterializedField getField();
+    void checkValueCapacity();
+
     <#list vv.types as type><#list type.minor as minor>
     <#assign lowerName = minor.class?uncap_first />
     <#if lowerName == "int" ><#assign lowerName = "integer" /></#if>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java 
b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
index c390770..2442434 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
@@ -40,6 +40,7 @@ package org.apache.drill.exec.vector.complex.impl;
 
 <#include "/@includes/vv_imports.ftl" />
 
+/* This class is generated using freemarker and the ComplexWriters.java 
template */
 @SuppressWarnings("unused")
 public class ${eName}WriterImpl extends AbstractFieldWriter {
   
@@ -51,7 +52,15 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     this.mutator = vector.getMutator();
     this.vector = vector;
   }
-  
+
+  public MaterializedField getField(){
+    return vector.getField();
+  }
+
+  public void checkValueCapacity() {
+    inform(vector.getValueCapacity() > idx());
+  }
+
   public void allocate(){
     inform(vector.allocateNewSafe());
   }
@@ -111,7 +120,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter 
{
       vector.setCurrentValueCount(idx());
     }
   }
-  
+
   </#if>
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 8b4496e..58e6ccc 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -44,7 +44,7 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements F
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
 
-  private int allocationValueCount = 4096;
+  private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
   private int allocationMonitor = 0;
   
   public ${minor.class}Vector(MaterializedField field, BufferAllocator 
allocator) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/codegen/templates/ListWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java 
b/exec/java-exec/src/main/codegen/templates/ListWriters.java
index 278fddc..c991fcc 100644
--- a/exec/java-exec/src/main/codegen/templates/ListWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java
@@ -36,7 +36,7 @@ package org.apache.drill.exec.vector.complex.impl;
 
 <#include "/@includes/vv_imports.ftl" />
 
-
+/* This class is generated using freemarker and the ListWriters.java template 
*/
 @SuppressWarnings("unused")
 public class ${mode}ListWriter extends AbstractFieldWriter{
   
@@ -136,6 +136,14 @@ public class ${mode}ListWriter extends AbstractFieldWriter{
   }
   </#list></#list>
 
+  public MaterializedField getField() {
+    return container.getField();
+  }
+
+  public void checkValueCapacity() {
+    inform(container.getValueCapacity() > idx());
+  }
+
   <#if mode == "Repeated">
   public void start(){
     if(ok()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java 
b/exec/java-exec/src/main/codegen/templates/MapWriters.java
index 7aa5a2e..dd7e653 100644
--- a/exec/java-exec/src/main/codegen/templates/MapWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java
@@ -41,6 +41,7 @@ import 
org.apache.drill.exec.vector.complex.writer.FieldWriter;
 
 import com.google.common.collect.Maps;
 
+/* This class is generated using freemarker and the MapWriters.java template */
 @SuppressWarnings("unused")
 public class ${mode}MapWriter extends AbstractFieldWriter{
   
@@ -53,7 +54,14 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
     this.container = container;
   }
 
-  
+  public MaterializedField getField() {
+      return container.getField();
+  }
+
+  public void checkValueCapacity(){
+    inform(container.getValueCapacity() > idx());
+  }
+
   public MapWriter map(String name){
     FieldWriter writer = fields.get(name);
     if(writer == null){
@@ -153,7 +161,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
     FieldWriter writer = fields.get(name);
     if(writer == null){
       ${vectName}Vector vector = container.addOrGet(name, ${upperName}_TYPE, 
${vectName}Vector.class);
-      AllocationHelper.allocate(vector, 1000, 100, 10);
+      vector.allocateNewSafe();
       writer = new ${vectName}WriterImpl(vector, this);
       writer.setPosition(${index});
       fields.put(name, writer);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
index 53b8f50..e66cabb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
@@ -67,7 +67,7 @@ public class JsonConvertFrom {
       String input = new String(buf, com.google.common.base.Charsets.UTF_8);
 
       try {
-        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new 
org.apache.drill.exec.vector.complex.fn.JsonReader(buffer);
+        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new 
org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null);
 
         jsonReader.write(new java.io.StringReader(input), writer);
 
@@ -94,7 +94,7 @@ public class JsonConvertFrom {
       String input = new String(buf, com.google.common.base.Charsets.UTF_8);
 
       try {
-        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new 
org.apache.drill.exec.vector.complex.fn.JsonReader(buffer);
+        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new 
org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null);
 
         jsonReader.write(new java.io.StringReader(input), writer);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 8b5577c..4adac20 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -117,7 +117,7 @@ public class JSONFormatPlugin extends 
EasyFormatPlugin<JSONFormatConfig> {
 
   @Override
   public boolean supportsPushDown() {
-    return false;
+    return true;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index 5d741e0..5bfc482 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -24,6 +24,7 @@ import java.util.List;
 import com.fasterxml.jackson.core.JsonParseException;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -31,11 +32,14 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
 import org.apache.drill.exec.vector.complex.fn.JsonRecordSplitter;
 import org.apache.drill.exec.vector.complex.fn.UTF8JsonRecordSplitter;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.FieldWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -53,13 +57,14 @@ public class JSONRecordReader2 implements RecordReader{
   private int recordCount;
   private FragmentContext fragmentContext;
   private OperatorContext operatorContext;
-
+  private List<SchemaPath> columns;
 
   public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, 
FileSystem fileSystem,
                           List<SchemaPath> columns) throws 
OutOfMemoryException {
     this.hadoopPath = new Path(inputPath);
     this.fileSystem = fileSystem;
     this.fragmentContext = fragmentContext;
+    this.columns = columns;
   }
 
   @Override
@@ -69,9 +74,9 @@ public class JSONRecordReader2 implements RecordReader{
       JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream);
       this.writer = new VectorContainerWriter(output);
       this.mutator = output;
-      jsonReader = new JsonReaderWithState(splitter, 
fragmentContext.getManagedBuffer());
+      jsonReader = new JsonReaderWithState(splitter, 
fragmentContext.getManagedBuffer(), columns);
     }catch(Exception e){
-      throw new ExecutionSetupException("Failure reading JSON file.", e);
+      handleAndRaise("Failure reading JSON file.", e);
     }
   }
 
@@ -102,7 +107,7 @@ public class JSONRecordReader2 implements RecordReader{
     recordCount = 0;
 
     try{
-      outside: while(true){
+      outside: while(true && recordCount < 
BaseValueVector.INITIAL_VALUE_ALLOCATION){
         writer.setPosition(recordCount);
 
         switch(jsonReader.write(writer)){
@@ -120,12 +125,31 @@ public class JSONRecordReader2 implements RecordReader{
           break outside;
         };
       }
-    } catch(Exception e) {
-      handleAndRaise("Failure while parsing JSON file.", e);
-    }
+      for (SchemaPath sp :jsonReader.getNullColumns() ) {
+        PathSegment root = sp.getRootSegment();
+        BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
+        if (root.getChild() != null && ! root.getChild().isArray()) {
+          fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
+          while ( root.getChild().getChild() != null && ! 
root.getChild().isArray() ) {
+            fieldWriter = 
fieldWriter.map(root.getChild().getNameSegment().getPath());
+            root = root.getChild();
+          }
+          fieldWriter.integer(root.getChild().getNameSegment().getPath());
+        } else  {
+          fieldWriter.integer(root.getNameSegment().getPath());
+        }
+      }
 
-    writer.setValueCount(recordCount);
-    return recordCount;
+      writer.setValueCount(recordCount);
+      return recordCount;
+
+    } catch (JsonParseException e) {
+      handleAndRaise("Error parsing JSON.", e);
+    } catch (IOException e) {
+      handleAndRaise("Error reading JSON.", e);
+    }
+    // this is never reached
+    return 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index cf62d54..1b3705e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -34,6 +34,7 @@ public abstract class BaseValueVector implements ValueVector{
 
   protected final BufferAllocator allocator;
   protected final MaterializedField field;
+  public static final int INITIAL_VALUE_ALLOCATION = 4096;
 
   BaseValueVector(MaterializedField field, BufferAllocator allocator) {
     this.allocator = allocator;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 0076a1d..82cc4c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -41,7 +41,7 @@ public final class BitVector extends BaseDataValueVector 
implements FixedWidthVe
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
 
-  private int allocationValueCount = 4096;
+  private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
   private int allocationMonitor = 0;
 
   private int valueCapacity;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 59035cb..8393dc6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -21,11 +21,17 @@ import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 import java.io.Reader;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.Float8Holder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
@@ -46,11 +52,65 @@ public class JsonReader {
   private final JsonFactory factory = new JsonFactory();
   private JsonParser parser;
   private DrillBuf workBuf;
+  private List<SchemaPath> columns;
+  // This is a parallel array for the field above to indicate if we have found 
any values in a
+  // given selected column. This allows for columns that are requested to 
receive a vector full of
+  // null values if no values were found in an entire read. The reason this 
needs to happen after
+  // all of the records have been read in a batch is to prevent a schema 
change when we actually find
+  // data in that column.
+  private boolean[] columnsFound;
+  // A flag set at setup time if the start column is in the requested column 
list, prevents
+  // doing a more computational intensive check if we are supposed to be 
reading a column
+  private boolean starRequested;
 
-  public JsonReader(DrillBuf managedBuf) throws JsonParseException, 
IOException {
+  public JsonReader() throws IOException {
+    this(null, null);
+  }
+
+  public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns) throws 
JsonParseException, IOException {
     factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
     factory.configure(Feature.ALLOW_COMMENTS, true);
     this.workBuf = managedBuf;
+    this.columns = columns;
+    // TODO - remove this check once the optimizer is updated to push down * 
instead of a null list
+    if (this.columns == null) {
+      this.columns = new ArrayList();
+      this.columns.add(new SchemaPath(new PathSegment.NameSegment("*")));
+    }
+    this.columnsFound = new boolean[this.columns.size()];
+    this.starRequested = containsStar();
+  }
+
+  private boolean containsStar() {
+    for (SchemaPath expr : this.columns){
+      if (expr.getRootSegment().getPath().equals("*"))
+        return true;
+    }
+    return false;
+  }
+
+  private boolean fieldSelected(SchemaPath field){
+    if (starRequested)
+      return true;
+    int i = 0;
+    for (SchemaPath expr : this.columns){
+      if ( expr.contains(field)){
+        columnsFound[i] = true;
+        return true;
+      }
+      i++;
+    }
+    return false;
+  }
+
+  public List<SchemaPath> getNullColumns() {
+    ArrayList<SchemaPath> nullColumns = new ArrayList<SchemaPath>();
+    for (int i = 0; i < columnsFound.length; i++ ) {
+      if ( ! columnsFound[i] && ! 
columns.get(i).getRootSegment().getPath().equals("*") ) {
+        nullColumns.add(columns.get(i));
+      }
+    }
+    return nullColumns;
   }
 
   public boolean write(Reader reader, ComplexWriter writer) throws 
JsonParseException, IOException {
@@ -81,6 +141,33 @@ public class JsonReader {
     return true;
   }
 
+  private void consumeEntireNextValue(JsonParser parser) throws IOException {
+    switch(parser.nextToken()){
+      case START_ARRAY:
+      case START_OBJECT:
+        int arrayAndObjectCounter = 1;
+        skipArrayLoop: while (true) {
+          switch(parser.nextToken()) {
+            case START_ARRAY:
+            case START_OBJECT:
+              arrayAndObjectCounter++;
+              break;
+            case END_ARRAY:
+            case END_OBJECT:
+              arrayAndObjectCounter--;
+              if (arrayAndObjectCounter == 0) {
+                break skipArrayLoop;
+              }
+              break;
+          }
+        }
+        break;
+      default:
+        // hit a single value, do nothing as the token was already read
+        // in the switch statement
+        break;
+    }
+  }
 
   private void writeData(MapWriter map) throws JsonParseException, IOException 
{
     //
@@ -91,7 +178,16 @@ public class JsonReader {
 
       assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME 
but got %s.", t.name());
       final String fieldName = parser.getText();
-
+      SchemaPath path;
+      if (map.getField().getPath().getRootSegment().getPath().equals("")) {
+        path = new SchemaPath(new PathSegment.NameSegment(fieldName));
+      } else {
+        path = map.getField().getPath().getChild(fieldName);
+      }
+      if ( ! fieldSelected(path) ) {
+        consumeEntireNextValue(parser);
+        continue outside;
+      }
 
       switch(parser.nextToken()){
       case START_ARRAY:
@@ -117,6 +213,7 @@ public class JsonReader {
         break;
       }
       case VALUE_NULL:
+        map.checkValueCapacity();
         // do nothing as we don't have a type.
         break;
       case VALUE_NUMBER_FLOAT:
@@ -131,14 +228,7 @@ public class JsonReader {
         break;
       case VALUE_STRING:
         VarCharHolder vh = new VarCharHolder();
-        String value = parser.getText();
-        byte[] b = value.getBytes(Charsets.UTF_8);
-        ensure(b.length);
-        workBuf.setBytes(0, b);
-        vh.buffer = workBuf;
-        vh.start = 0;
-        vh.end = b.length;
-        map.varChar(fieldName).write(vh);
+        map.varChar(fieldName).write(prepareVarCharHolder(vh, parser));
         break;
 
       default:
@@ -155,6 +245,17 @@ public class JsonReader {
     workBuf = workBuf.reallocIfNeeded(length);
   }
 
+  private VarCharHolder prepareVarCharHolder(VarCharHolder vh, JsonParser 
parser) throws IOException {
+    String value = parser.getText();
+    byte[] b = value.getBytes(Charsets.UTF_8);
+    ensure(b.length);
+    workBuf.setBytes(0, b);
+    vh.buffer = workBuf;
+    vh.start = 0;
+    vh.end = b.length;
+    return vh;
+  }
+
   private void writeData(ListWriter list) throws JsonParseException, 
IOException {
     list.start();
     outside: while(true){
@@ -198,14 +299,7 @@ public class JsonReader {
         break;
       case VALUE_STRING:
         VarCharHolder vh = new VarCharHolder();
-        String value = parser.getText();
-        byte[] b = value.getBytes(Charsets.UTF_8);
-        ensure(b.length);
-        workBuf.setBytes(0, b);
-        vh.buffer = workBuf;
-        vh.start = 0;
-        vh.end = b.length;
-        list.varChar().write(vh);
+        list.varChar().write(prepareVarCharHolder(vh, parser));
         break;
       default:
         throw new IllegalStateException("Unexpected token " + 
parser.getCurrentToken());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
index 54e063b..13b8215 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
@@ -22,7 +22,9 @@ import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 import java.io.Reader;
+import java.util.List;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.fasterxml.jackson.core.JsonParseException;
@@ -37,10 +39,18 @@ public class JsonReaderWithState {
   private JsonRecordSplitter splitter;
   private JsonReader jsonReader;
 
-  public JsonReaderWithState(JsonRecordSplitter splitter, DrillBuf workspace) 
throws IOException{
+  public JsonReaderWithState(JsonRecordSplitter splitter, DrillBuf workspace, 
List<SchemaPath> columns) throws IOException{
     this.splitter = splitter;
     reader = splitter.getNextReader();
-    jsonReader = new JsonReader(workspace);
+    jsonReader = new JsonReader(workspace, columns);
+  }
+
+  public JsonReaderWithState(JsonRecordSplitter splitter) throws IOException{
+    this(splitter, null, null);
+  }
+
+  public List<SchemaPath> getNullColumns() {
+    return jsonReader.getNullColumns();
   }
 
   public WriteState write(ComplexWriter writer) throws JsonParseException, 
IOException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
index 2fa72f7..165c5b5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -48,6 +48,14 @@ public class ComplexWriterImpl extends AbstractFieldWriter 
implements ComplexWri
     this.container = container;
   }
 
+  public MaterializedField getField() {
+    return container.getField();
+  }
+
+  public void checkValueCapacity(){
+    inform(container.getValueCapacity() > idx());
+  }
+
   private void check(Mode... modes){
     StateTool.check(mode, modes);
   }
@@ -164,7 +172,6 @@ public class ComplexWriterImpl extends AbstractFieldWriter 
implements ComplexWri
 
     return listRoot;
   }
-  
 
   private static class VectorAccessibleFacade extends MapVector {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index 4f669c0..57980f8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -40,6 +40,14 @@ public class VectorContainerWriter extends 
AbstractFieldWriter implements Comple
     this.mapRoot = new SingleMapWriter(mapVector, this);
   }
 
+  public MaterializedField getField() {
+    return mapVector.getField();
+  }
+
+  public void checkValueCapacity(){
+    inform(mapVector.getValueCapacity() > idx());
+  }
+
   public MapVector getMapVector() {
     return mapVector;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 
b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 5e52e82..f504de4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -147,7 +147,7 @@ public class BaseTestQuery extends ExecTest{
     return client.runQuery(type, query);
   }
 
-  private int testRunAndPrint(QueryType type, String query) throws Exception{
+  protected int testRunAndPrint(QueryType type, String query) throws Exception{
     query = query.replace("[WORKING_PATH]", TestTools.getWorkingPath());
     PrintingResultsListener resultListener = new 
PrintingResultsListener(client.getConfig(), Format.TSV, 
VectorUtil.DEFAULT_COLUMN_WIDTH);
     client.runQuery(type, query, resultListener);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index b4fd09d..0f0743a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -198,6 +199,8 @@ public class TestParquetWriter extends BaseTestQuery {
       //test(String.format("ALTER SESSION SET `%s` = %d", 
ExecConstants.PARQUET_BLOCK_SIZE, 1*1024*1024));
       String selection = "mi";
       String inputTable = "cp.`customer.json`";
+      int count = testRunAndPrint(UserBitShared.QueryType.SQL, "select mi from 
cp.`customer.json`");
+      System.out.println(count);
       runTestAndValidate(selection, selection, inputTable, 
"foodmart_customer_parquet");
     } finally {
       test(String.format("ALTER SESSION SET `%s` = %d", 
ExecConstants.PARQUET_BLOCK_SIZE, 512*1024*1024));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index f2f91f4..885e50a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -17,14 +17,32 @@
  */
 package org.apache.drill.exec.vector.complex.writer;
 
+import static org.jgroups.util.Util.assertTrue;
 import static org.junit.Assert.assertEquals;
 import io.netty.buffer.DrillBuf;
+import static org.junit.Assert.assertNull;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.List;
 
+import com.google.common.io.Files;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.RepeatedBigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
@@ -34,6 +52,7 @@ import 
org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,10 +60,11 @@ import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 
-public class TestJsonReader {
+public class TestJsonReader extends BaseTestQuery {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
 
   private static BufferAllocator allocator;
+  private static final boolean VERBOSE_DEBUG = false;
 
   @BeforeClass
   public static void setupAllocator(){
@@ -56,6 +76,142 @@ public class TestJsonReader {
     allocator.close();
   }
 
+  public void runTestsOnFile(String filename, UserBitShared.QueryType 
queryType, String[] queries, long[] rowCounts) throws Exception {
+    if (VERBOSE_DEBUG) {
+      System.out.println("===================");
+      System.out.println("source data in json");
+      System.out.println("===================");
+      System.out.println(Files.toString(FileUtils.getResourceAsFile(filename), 
Charsets.UTF_8));
+    }
+
+    int i = 0;
+    for (String query : queries) {
+      if (VERBOSE_DEBUG) {
+        System.out.println("=====");
+        System.out.println("query");
+        System.out.println("=====");
+        System.out.println(query);
+        System.out.println("======");
+        System.out.println("result");
+        System.out.println("======");
+      }
+      int rowCount = testRunAndPrint(queryType, query);
+      assertEquals( rowCount, rowCounts[i]);
+      System.out.println();
+      i++;
+    }
+  }
+
+  @Test
+  public void testSingleColumnRead_vector_fill_bug() throws Exception {
+    String[] queries = {"select * from 
cp.`/store/json/single_column_long_file.json`"};
+    long[] rowCounts = {13512};
+    String filename = "/store/json/single_column_long_file.json";
+    runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
+  }
+
+  @Test
+  public void testNonExistentColumnReadAlone() throws Exception {
+    String[] queries = {"select non_existent_column from 
cp.`/store/json/single_column_long_file.json`"};
+    long[] rowCounts = {13512};
+    String filename = "/store/json/single_column_long_file.json";
+    runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
+  }
+
+  @Test
+  public void readComplexWithStar() throws Exception {
+    List<QueryResultBatch> results = testSqlWithResults("select * from 
cp.`/store/json/test_complex_read_with_star.json`");
+    assertEquals(2, results.size());
+
+    RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+    QueryResultBatch batch = results.get(0);
+
+    assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+    assertEquals(3, batchLoader.getSchema().getFieldCount());
+    testExistentColumns(batchLoader, batch);
+
+    batch.release();
+    batchLoader.clear();
+  }
+
+  // The project pushdown rule is correctly adding the projected columns to 
the scan, however it is not removing
+  // the redundant project operator after the scan, this tests runs a physical 
plan generated from one of the tests to
+  // ensure that the project is filtering out the correct data in the scan 
alone
+  @Test
+  public void testProjectPushdown() throws Exception {
+    String[] queries = 
{Files.toString(FileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"),
 Charsets.UTF_8)};
+    long[] rowCounts = {3};
+    String filename = "/store/json/schema_change_int_to_string.json";
+    runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, 
rowCounts);
+
+    List<QueryResultBatch> results = testPhysicalWithResults(queries[0]);
+    assertEquals(2, results.size());
+    // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", 
"`field_4`.`inner_1`"
+
+    RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+    QueryResultBatch batch = results.get(0);
+    assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+    assertEquals(5, batchLoader.getSchema().getFieldCount());
+    testExistentColumns(batchLoader, batch);
+
+    VectorWrapper vw = batchLoader.getValueAccessorById(
+        NullableIntVector.class, //
+        
batchLoader.getValueVectorId(SchemaPath.getCompoundPath("non_existent_at_root")).getFieldIds()
 //
+    );
+    assertNull(vw.getValueVector().getAccessor().getObject(0));
+    assertNull(vw.getValueVector().getAccessor().getObject(1));
+    assertNull(vw.getValueVector().getAccessor().getObject(2));
+
+    vw = batchLoader.getValueAccessorById(
+        NullableIntVector.class, //
+        
batchLoader.getValueVectorId(SchemaPath.getCompoundPath("non_existent", 
"nested","field")).getFieldIds() //
+    );
+    assertNull(vw.getValueVector().getAccessor().getObject(0));
+    assertNull(vw.getValueVector().getAccessor().getObject(1));
+    assertNull(vw.getValueVector().getAccessor().getObject(2));
+
+    vw.getValueVector().clear();
+    batch.release();
+    batchLoader.clear();
+  }
+
+  private void testExistentColumns(RecordBatchLoader batchLoader, 
QueryResultBatch batch) throws SchemaChangeException {
+
+    assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+
+    VectorWrapper<?> vw = batchLoader.getValueAccessorById(
+        RepeatedBigIntVector.class, //
+        
batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds()
 //
+    );
+    assertEquals("[1]", 
vw.getValueVector().getAccessor().getObject(0).toString());
+    assertEquals("[5]", 
vw.getValueVector().getAccessor().getObject(1).toString());
+    assertEquals("[5,10,15]", 
vw.getValueVector().getAccessor().getObject(2).toString());
+
+    vw = batchLoader.getValueAccessorById(
+        IntVector.class, //
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", 
"inner_1")).getFieldIds() //
+    );
+    assertNull(vw.getValueVector().getAccessor().getObject(0));
+    assertEquals(2l, vw.getValueVector().getAccessor().getObject(1));
+    assertEquals(5l, vw.getValueVector().getAccessor().getObject(2));
+
+    vw = batchLoader.getValueAccessorById(
+        IntVector.class, //
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", 
"inner_2")).getFieldIds() //
+    );
+    assertNull(vw.getValueVector().getAccessor().getObject(0));
+    assertNull(vw.getValueVector().getAccessor().getObject(1));
+    assertEquals(3l, vw.getValueVector().getAccessor().getObject(2));
+
+    vw = batchLoader.getValueAccessorById(
+        RepeatedBigIntVector.class, //
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", 
"inner_1")).getFieldIds() //
+    );
+    assertEquals("[]", 
vw.getValueVector().getAccessor().getObject(0).toString());
+    assertEquals("[1,2,3]", 
vw.getValueVector().getAccessor().getObject(1).toString());
+    assertEquals("[4,5,6]", 
vw.getValueVector().getAccessor().getObject(2).toString());
+  }
+
   @Test
   public void testReader() throws Exception{
     final int repeatSize = 10;
@@ -89,7 +245,7 @@ public class TestJsonReader {
     writer.allocate();
 
     DrillBuf buffer = allocator.buffer(255);
-    JsonReaderWithState jsonReader = new JsonReaderWithState(new 
ReaderJSONRecordSplitter(compound), buffer);
+    JsonReaderWithState jsonReader = new JsonReaderWithState(new 
ReaderJSONRecordSplitter(compound), buffer, null);
     int i =0;
     List<Integer> batchSizes = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json
 
b/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json
new file mode 100644
index 0000000..60cd2b0
--- /dev/null
+++ 
b/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json
@@ -0,0 +1,50 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "DefaultSqlHandler",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "options" : [ {
+      "name" : "planner.width.max_per_node",
+      "kind" : "LONG",
+      "type" : "SESSION",
+      "num_val" : 2
+    } ],
+    "queue" : 0,
+    "resultMode" : "EXEC"
+  },
+  "graph" : [ {
+    "pop" : "fs-scan",
+    "@id" : 1,
+    "files" : [ "/store/json/schema_change_int_to_string.json" ],
+    "storage" : {
+      "type" : "file",
+      "enabled" : true,
+      "connection" : "classpath:///",
+      "workspaces" : null,
+      "formats" : {
+        "json" : {
+          "type" : "json"
+        },
+        "parquet" : {
+          "type" : "parquet"
+        }
+      }
+    },
+    "format" : {
+      "type" : "json"
+    },
+    "columns" : [ "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", 
"`field_4`.`inner_1`", "`non_existent_at_root`", 
"`non_existent`.`nested`.`field`"],
+    "selectionRoot" : "/store/json/schema_change_int_to_string.json",
+    "cost" : 0.0
+  }, {
+    "pop" : "screen",
+    "@id" : 0,
+    "child" : 1,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000,
+    "cost" : 1.0
+  } ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/929d765a/exec/java-exec/src/test/resources/store/json/schema_change_int_to_string.json
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/store/json/schema_change_int_to_string.json 
b/exec/java-exec/src/test/resources/store/json/schema_change_int_to_string.json
new file mode 100644
index 0000000..f2fca86
--- /dev/null
+++ 
b/exec/java-exec/src/test/resources/store/json/schema_change_int_to_string.json
@@ -0,0 +1,30 @@
+{
+    "field_1": [1]
+}
+{
+    "field_1": [5],
+    "field_2": 2,
+    "field_3": {
+        "inner_1" : 2
+    },
+    "field_4" : {
+        "inner_1" : [1,2,3],
+        "inner_2" : 3,
+        "inner_3" :  { "inner_object_field_1" : 2}
+    },
+    "field_5" : [ { "inner_list" : [1,6] }, { "inner_list":[3,8]}, { 
"inner_list":[12,4,5]} ]
+}
+{
+    "field_1": [5,10,15],
+    "field_2": "A wild string appears!",
+    "field_3": {
+        "inner_1" : 5,
+        "inner_2" : 3,
+        "inner_3" : [ { "inner_object_field_1" : 2}, {"inner_object_field_1" : 
10} ]
+    },
+    "field_4" : {
+        "inner_1" : [4,5,6],
+        "inner_2" : 3
+    },
+    "field_5" : [ { "inner_list" : [5,6.0, "1234"] }, { "inner_list":[7,8.0, 
"12341324"], "inner_list_2" : [1,2,2323.443e10, "hello there"]}, { 
"inner_list":[3,4,5], "inner_list_2" : [10, 11, 12]} ]
+}

Reply via email to