This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 93a39cd9b0790129bb6888cbfefc100efd8a6bc8
Author: Paul Rogers <[email protected]>
AuthorDate: Fri Nov 29 18:58:59 2019 -0800

    DRILL-7324: Final set of "batch count" fixes
    
    Final set of fixes for batch count/record count issues. Enables
    vector checking for all operators.
    
    closes #1912
---
 .../apache/drill/exec/physical/impl/ScanBatch.java |   1 -
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |   6 +-
 .../exec/physical/impl/aggregate/HashAggBatch.java |   2 +-
 .../physical/impl/aggregate/StreamingAggBatch.java |   4 +-
 .../exec/physical/impl/filter/FilterTemplate2.java |   7 +-
 .../exec/physical/impl/join/HashJoinBatch.java     |   5 +-
 .../physical/impl/join/NestedLoopJoinBatch.java    |   8 +-
 .../impl/metadata/MetadataControllerBatch.java     |  33 ++---
 .../RangePartitionRecordBatch.java                 |   1 -
 .../impl/statistics/StatisticsMergeBatch.java      |  52 +++----
 .../impl/svremover/RemovingRecordBatch.java        |   7 +-
 .../physical/impl/union/UnionAllRecordBatch.java   |   7 +-
 .../physical/impl/unnest/UnnestRecordBatch.java    |   1 +
 .../impl/unpivot/UnpivotMapsRecordBatch.java       |   4 +-
 .../physical/impl/validate/BatchValidator.java     | 114 +--------------
 .../impl/window/WindowFrameRecordBatch.java        |  14 +-
 .../apache/drill/exec/record/VectorAccessible.java |   2 -
 .../apache/drill/exec/record/VectorContainer.java  |   6 +
 .../exec/store/easy/json/JSONRecordReader.java     |  22 +--
 .../exec/vector/complex/writer/TestJsonReader.java | 159 +++++++++++++--------
 20 files changed, 177 insertions(+), 278 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 3e658cb..f464b27 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -575,7 +575,6 @@ public class ScanBatch implements CloseableRecordBatch {
     }
   }
 
-
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
     return container.iterator();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 5ae6e76..baef314 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -559,12 +559,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       // Transfers count number of records from hyperBatch to simple container
       final int copiedRecords = copier.copyRecords(0, count);
       assert copiedRecords == count;
-      for (VectorWrapper<?> v : newContainer) {
-        ValueVector.Mutator m = v.getValueVector().getMutator();
-        m.setValueCount(count);
-      }
       newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      newContainer.setRecordCount(count);
+      newContainer.setValueCount(count);
       // Store all the batches containing limit number of records
       batchBuilder.add(newBatch);
     } while (queueSv4.next());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 45c670b..38fb14e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -269,7 +269,7 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
     for (VectorWrapper<?> w : container) {
       AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 
0);
     }
-    container.setValueCount(0);
+    container.setEmpty();
     if (incoming.getRecordCount() > 0) {
       hashAggMemoryManager.update();
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c3b504a..586fa32 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -186,9 +186,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
     if (!createAggregator()) {
       state = BatchState.DONE;
     }
-    for (VectorWrapper<?> w : container) {
-      w.getValueVector().allocateNew();
-    }
+    container.allocateNew();
 
     if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index c189367..e3b6070 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -66,12 +66,7 @@ public abstract class FilterTemplate2 implements Filterer {
     if (recordCount == 0) {
       outgoingSelectionVector.setRecordCount(0);
       outgoingSelectionVector.setBatchActualRecordCount(0);
-
-      // Must allocate vectors, then set count to zero. Allocation
-      // is needed since offset vectors must contain at least one
-      // item (the required value of 0 in index location 0.)
-      outgoing.getContainer().allocateNew();
-      outgoing.getContainer().setValueCount(0);
+      outgoing.getContainer().setEmpty();
       return;
     }
     if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index eab38ec..cded844 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -77,6 +77,7 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.record.RecordBatchStats;
@@ -703,9 +704,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> implem
   private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, 
boolean isLeft) {
       batch.kill(true);
       while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == 
IterOutcome.OK) {
-        for (VectorWrapper<?> wrapper : batch) {
-          wrapper.getValueVector().clear();
-        }
+        VectorAccessibleUtilities.clear(batch);
         upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : 
HashJoinHelper.RIGHT_INPUT, batch);
       }
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 6b7edd2..c84f954 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -87,10 +87,10 @@ public class NestedLoopJoinBatch extends 
AbstractBinaryRecordBatch<NestedLoopJoi
   private int outputRecords;
 
   // We accumulate all the batches on the right side in a hyper container.
-  private ExpandableHyperContainer rightContainer = new 
ExpandableHyperContainer();
+  private final ExpandableHyperContainer rightContainer = new 
ExpandableHyperContainer();
 
   // Record count of the individual batches in the right hyper container
-  private LinkedList<Integer> rightCounts = new LinkedList<>();
+  private final LinkedList<Integer> rightCounts = new LinkedList<>();
 
 
   // Generator mapping for the right side
@@ -372,9 +372,7 @@ public class NestedLoopJoinBatch extends 
AbstractBinaryRecordBatch<NestedLoopJoi
 
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
-        for (final VectorWrapper<?> vw : left) {
-          container.addOrGet(vw.getField());
-        }
+        container.copySchemaFrom(left);
       }
 
       if (rightUpstream != IterOutcome.NONE) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 9ccae49..ab82769 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -17,6 +17,17 @@
  */
 package org.apache.drill.exec.physical.impl.metadata;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -24,6 +35,7 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
+import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils;
 import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MetadataControllerPOP;
@@ -32,7 +44,6 @@ import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.WriterPrel;
-import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
@@ -80,17 +91,6 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 /**
  * Terminal operator for producing ANALYZE statement. This operator is 
responsible for converting
  * obtained metadata, fetching absent metadata from the Metastore and storing 
resulting metadata into the Metastore.
@@ -109,9 +109,9 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
 
   private boolean firstLeft = true;
   private boolean firstRight = true;
-  private boolean finished = false;
-  private boolean finishedRight = false;
-  private int recordCount = 0;
+  private boolean finished;
+  private boolean finishedRight;
+  private int recordCount;
 
   protected MetadataControllerBatch(MetadataControllerPOP popConfig,
       FragmentContext context, RecordBatch left, RecordBatch right) throws 
OutOfMemoryException {
@@ -129,13 +129,10 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
 
   protected boolean setupNewSchema() {
     container.clear();
-
     container.addOrGet(MetastoreAnalyzeConstants.OK_FIELD_NAME, 
Types.required(TypeProtos.MinorType.BIT), null);
     container.addOrGet(MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME, 
Types.required(TypeProtos.MinorType.VARCHAR), null);
-
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     container.setEmpty();
-
     return true;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index 11d307b..7a61489 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -184,5 +184,4 @@ public class RangePartitionRecordBatch extends 
AbstractSingleRecordBatch<RangePa
     logger.error("RangePartitionRecordBatch[container={}, numPartitions={}, 
recordCount={}, partitionIdVector={}]",
         container, numPartitions, recordCount, partitionIdVector);
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 15962ad..921c92b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.ValueExpressions;
@@ -46,11 +47,12 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.metastore.statistics.Statistic;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- *
  * Example input and output:
- * Schema of incoming batch:
+ * Schema of incoming batch:<pre>
  *    "columns"       : MAP - Column names
  *       "region_id"  : VARCHAR
  *       "sales_city" : VARCHAR
@@ -65,7 +67,7 @@ import 
org.apache.drill.shaded.guava.com.google.common.collect.Lists;
  *       "sales_city" : BIGINT - nonnullstatcount(sales_city)
  *       "cnt"        : BIGINT - nonnullstatcount(cnt)
  *   .... another map for next stats function ....
- * Schema of outgoing batch:
+ * </pre>Schema of outgoing batch:<pre>
  *    "schema" : BIGINT - Schema number. For each schema change this number is 
incremented.
  *    "computed" : DATE - What time is it computed?
  *    "columns"       : MAP - Column names
@@ -82,17 +84,19 @@ import 
org.apache.drill.shaded.guava.com.google.common.collect.Lists;
  *       "sales_city" : BIGINT - nonnullstatcount(sales_city)
  *       "cnt"        : BIGINT - nonnullstatcount(cnt)
  *   .... another map for next stats function ....
+ * </pre>
  */
+
 public class StatisticsMergeBatch extends 
AbstractSingleRecordBatch<StatisticsMerge> {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StatisticsMergeBatch.class);
-  private Map<String, String> functions;
+  private static final Logger logger = 
LoggerFactory.getLogger(StatisticsMergeBatch.class);
+
+  private final Map<String, String> functions;
   private boolean first = true;
-  private boolean finished = false;
-  private int schema = 0;
-  private int recordCount = 0;
-  private List<String> columnsList = null;
+  private boolean finished;
+  private int schema;
+  private List<String> columnsList;
   private double samplePercent = 100.0;
-  private List<MergedStatistic> mergedStatisticList = null;
+  private final List<MergedStatistic> mergedStatisticList;
 
   public StatisticsMergeBatch(StatisticsMerge popConfig, RecordBatch incoming,
       FragmentContext context) throws OutOfMemoryException {
@@ -115,20 +119,6 @@ public class StatisticsMergeBatch extends 
AbstractSingleRecordBatch<StatisticsMe
   }
 
   /*
-   * Adds the `name` column value vector in the `parent` map vector. These 
`name` columns are
-   * table columns for which statistics will be computed.
-   */
-  private ValueVector addMapVector(String name, MapVector parent, 
LogicalExpression expr)
-      throws SchemaChangeException {
-    LogicalExpression mle = PhysicalOperatorUtil.materializeExpression(expr, 
incoming, context);
-    Class<? extends ValueVector> vvc =
-        TypeHelper.getValueVectorClass(mle.getMajorType().getMinorType(),
-            mle.getMajorType().getMode());
-    ValueVector vector = parent.addOrGet(name, mle.getMajorType(), vvc);
-    return vector;
-  }
-
-  /*
    * Identify the list of fields within a map which are generated by 
StatisticsMerge. Perform
    * basic sanity check i.e. all maps have the same number of columns and 
those columns are
    * the same in each map
@@ -229,8 +219,7 @@ public class StatisticsMergeBatch extends 
AbstractSingleRecordBatch<StatisticsMe
         }
       }
     }
-    container.setRecordCount(0);
-    recordCount = 0;
+    container.setEmpty();
     container.buildSchema(incoming.getSchema().getSelectionVectorMode());
   }
 
@@ -238,7 +227,7 @@ public class StatisticsMergeBatch extends 
AbstractSingleRecordBatch<StatisticsMe
    * Determines the MajorType based on the incoming value vector. Please look 
at the
    * comments above the class definition which describes the incoming/outgoing 
batch schema
    */
-  private void addVectorToOutgoingContainer(String outStatName, VectorWrapper 
vw)
+  private void addVectorToOutgoingContainer(String outStatName, 
VectorWrapper<?> vw)
       throws SchemaChangeException {
     // Input map vector
     MapVector inputVector = (MapVector) vw.getValueVector();
@@ -306,9 +295,8 @@ public class StatisticsMergeBatch extends 
AbstractSingleRecordBatch<StatisticsMe
         }
       }
     }
-    ++recordCount;
     // Populate the number of records (1) inside the outgoing batch.
-    container.setRecordCount(1);
+    container.setValueCount(1);
     return IterOutcome.OK;
   }
 
@@ -343,9 +331,7 @@ public class StatisticsMergeBatch extends 
AbstractSingleRecordBatch<StatisticsMe
   }
 
   @Override
-  public void dump() {
-
-  }
+  public void dump() { }
 
   @Override
   public IterOutcome innerNext() {
@@ -404,6 +390,6 @@ public class StatisticsMergeBatch extends 
AbstractSingleRecordBatch<StatisticsMe
 
   @Override
   public int getRecordCount() {
-    return recordCount;
+    return container.getRecordCount();
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 4471248..a9584bb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -25,13 +25,16 @@ import 
org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.WritableBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVectorRemover>{
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(RemovingRecordBatch.class);
 
   private Copier copier;
 
-  public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext 
context, RecordBatch incoming) throws OutOfMemoryException {
+  public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext 
context,
+      RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, incoming);
     logger.debug("Created.");
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 25dae80..ed2b66e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.union;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -68,8 +69,8 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
 
   private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private UnionAller unionall;
-  private final List<TransferPair> transfers = Lists.newArrayList();
-  private final List<ValueVector> allocationVectors = Lists.newArrayList();
+  private final List<TransferPair> transfers = new ArrayList<>();
+  private final List<ValueVector> allocationVectors = new ArrayList<>();
   private int recordCount;
   private UnionInputIterator unionInputIterator;
 
@@ -341,7 +342,7 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
   }
 
   private class UnionInputIterator implements Iterator<Pair<IterOutcome, 
BatchStatusWrappper>> {
-    private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
+    private final Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
 
     UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome 
rightOutCome, RecordBatch right) {
       if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 1715c99..85eceea 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -295,6 +295,7 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
       remainderIndex = 0;
       logger.debug("IterOutcome: EMIT.");
     }
+    rowIdVector.getMutator().setValueCount(outputRecords);
     container.setValueCount(outputRecords);
 
     memoryManager.updateOutgoingStats(outputRecords);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 99bd6d1..72a337a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -64,8 +64,8 @@ import org.slf4j.LoggerFactory;
  *       "sales_city" : BIGINT - nonnullstatcount(sales_city)
  *       "cnt"        : BIGINT - nonnullstatcount(cnt)
  *   .... another map for next stats function ....
- *
- * Schema of output:
+ * </pre>
+ * Schema of output: <pre>
  *  "schema"           : BIGINT - Schema number. For each schema change this 
number is incremented.
  *  "computed"         : BIGINT - What time is this computed?
  *  "column"           : column name
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 8793a65..e1ffd7a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -17,41 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.validate;
 
-import java.util.IdentityHashMap;
-import java.util.Map;
-
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.physical.impl.WriterRecordBatch;
-import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
-import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
-import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
-import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
-import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
-import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
-import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
-import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
-import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch;
-import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
-import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
-import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
-import 
org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataHashAggBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataStreamAggBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataControllerBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataHandlerBatch;
-import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
-import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
-import 
org.apache.drill.exec.physical.impl.rangepartitioner.RangePartitionRecordBatch;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
-import org.apache.drill.exec.physical.impl.trace.TraceRecordBatch;
-import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
-import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch;
-import 
org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
-import org.apache.drill.exec.physical.impl.unpivot.UnpivotMapsRecordBatch;
-import org.apache.drill.exec.physical.impl.window.WindowFrameRecordBatch;
-import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
-import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleVectorWrapper;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -200,89 +166,13 @@ public class BatchValidator {
     }
   }
 
-  private enum CheckMode {
-    /** No checking. */
-    NONE,
-    /** Check only batch, container counts. */
-    COUNTS,
-    /** Check vector value counts. */
-    VECTORS
-    };
-
-  private static final Map<Class<? extends CloseableRecordBatch>, CheckMode> 
checkRules = buildRules();
-
   private final ErrorReporter errorReporter;
 
   public BatchValidator(ErrorReporter errorReporter) {
     this.errorReporter = errorReporter;
   }
 
-  /**
-   * At present, most operators will not pass the checks here. The following
-   * table identifies those that should be checked, and the degree of check.
-   * Over time, this table should include all operators, and thus become
-   * unnecessary.
-   */
-  private static Map<Class<? extends CloseableRecordBatch>, CheckMode> 
buildRules() {
-    Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new 
IdentityHashMap<>();
-    rules.put(OperatorRecordBatch.class, CheckMode.VECTORS);
-    rules.put(ScanBatch.class, CheckMode.VECTORS);
-    rules.put(ProjectRecordBatch.class, CheckMode.VECTORS);
-    rules.put(FilterRecordBatch.class, CheckMode.VECTORS);
-    rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS);
-    rules.put(UnnestRecordBatch.class, CheckMode.VECTORS);
-    rules.put(HashAggBatch.class, CheckMode.VECTORS);
-    rules.put(RemovingRecordBatch.class, CheckMode.VECTORS);
-    rules.put(StreamingAggBatch.class, CheckMode.VECTORS);
-    rules.put(RuntimeFilterRecordBatch.class, CheckMode.VECTORS);
-    rules.put(FlattenRecordBatch.class, CheckMode.VECTORS);
-    rules.put(MergeJoinBatch.class, CheckMode.VECTORS);
-    rules.put(NestedLoopJoinBatch.class, CheckMode.VECTORS);
-    rules.put(LimitRecordBatch.class, CheckMode.VECTORS);
-    rules.put(MergingRecordBatch.class, CheckMode.VECTORS);
-    rules.put(OrderedPartitionRecordBatch.class, CheckMode.VECTORS);
-    rules.put(RangePartitionRecordBatch.class, CheckMode.VECTORS);
-    rules.put(TraceRecordBatch.class, CheckMode.VECTORS);
-    rules.put(UnionAllRecordBatch.class, CheckMode.VECTORS);
-    rules.put(UnorderedReceiverBatch.class, CheckMode.VECTORS);
-    rules.put(UnpivotMapsRecordBatch.class, CheckMode.VECTORS);
-    rules.put(WindowFrameRecordBatch.class, CheckMode.VECTORS);
-    rules.put(TopNBatch.class, CheckMode.VECTORS);
-    rules.put(HashJoinBatch.class, CheckMode.VECTORS);
-    rules.put(ExternalSortBatch.class, CheckMode.VECTORS);
-    rules.put(WriterRecordBatch.class, CheckMode.VECTORS);
-    rules.put(MetadataStreamAggBatch.class, CheckMode.VECTORS);
-    rules.put(MetadataHashAggBatch.class, CheckMode.VECTORS);
-    rules.put(MetadataHandlerBatch.class, CheckMode.VECTORS);
-    rules.put(MetadataControllerBatch.class, CheckMode.VECTORS);
-    return rules;
-  }
-
-  private static CheckMode lookup(Object subject) {
-    CheckMode checkMode = checkRules.get(subject.getClass());
-    return checkMode == null ? CheckMode.NONE : checkMode;
-  }
-
   public static boolean validate(RecordBatch batch) {
-    // This is a handy place to trace batches as they flow up
-    // the DAG. Works best for single-threaded runs with few records.
-    // System.out.println(batch.getClass().getSimpleName());
-    // RowSetFormatter.print(batch);
-
-    CheckMode checkMode = lookup(batch);
-
-    // If no rule, don't check this batch.
-
-    if (checkMode == CheckMode.NONE) {
-
-      // As work proceeds, might want to log those batches not checked.
-      // For now, there are too many.
-
-      return true;
-    }
-
-    // All batches that do any checks will at least check counts.
-
     ErrorReporter reporter = errorReporter(batch);
     int rowCount = batch.getRecordCount();
     int valueCount = rowCount;
@@ -340,9 +230,7 @@ public class BatchValidator {
         break;
       }
     }
-    if (checkMode == CheckMode.VECTORS) {
-      new BatchValidator(reporter).validateBatch(batch, valueCount);
-    }
+    new BatchValidator(reporter).validateBatch(batch, valueCount);
     return reporter.errorCount() == 0;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 6ed004f..07a5c76 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.window;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -47,7 +48,6 @@ import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +64,7 @@ public class WindowFrameRecordBatch extends 
AbstractRecordBatch<WindowPOP> {
   private List<WindowDataBatch> batches;
 
   private WindowFramer[] framers;
-  private final List<WindowFunction> functions = Lists.newArrayList();
+  private final List<WindowFunction> functions = new ArrayList<>();
 
   private boolean noMoreBatches; // true when downstream returns NONE
   private BatchSchema schema;
@@ -75,7 +75,7 @@ public class WindowFrameRecordBatch extends 
AbstractRecordBatch<WindowPOP> {
       RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
-    batches = Lists.newArrayList();
+    batches = new ArrayList<>();
   }
 
   /**
@@ -260,17 +260,15 @@ public class WindowFrameRecordBatch extends 
AbstractRecordBatch<WindowPOP> {
 
     logger.trace("creating framer(s)");
 
-    List<LogicalExpression> keyExprs = Lists.newArrayList();
-    List<LogicalExpression> orderExprs = Lists.newArrayList();
+    List<LogicalExpression> keyExprs = new ArrayList<>();
+    List<LogicalExpression> orderExprs = new ArrayList<>();
     boolean requireFullPartition = false;
 
     boolean useDefaultFrame = false; // at least one window function uses the 
DefaultFrameTemplate
     boolean useCustomFrame = false; // at least one window function uses the 
CustomFrameTemplate
 
     // all existing vectors will be transferred to the outgoing container in 
framer.doWork()
-    for (VectorWrapper<?> wrapper : batch) {
-      container.addOrGet(wrapper.getField());
-    }
+    container.copySchemaFrom(batch);
 
     // add aggregation vectors to the container, and materialize corresponding 
expressions
     for (NamedExpression ne : popConfig.getAggregations()) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index f51f521..03e8ffa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -21,10 +21,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
-// TODO javadoc
 public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
   // TODO are these <?> related in any way? Should they be the same one?
-  // TODO javadoc
   VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 1cfc61d..3796e5a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -553,4 +553,10 @@ public class VectorContainer implements VectorAccessible {
     // in the offset vectors that need it.
     setValueCount(0);
   }
+
+  public void copySchemaFrom(VectorAccessible other) {
+    for (VectorWrapper<?> wrapper : other) {
+      addOrGet(wrapper.getField());
+    }
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index da42b27..0ab4181 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -79,8 +79,8 @@ public class JSONRecordReader extends AbstractRecordReader {
    * @param columns  pathnames of columns/subfields to read
    * @throws OutOfMemoryException
    */
-  public JSONRecordReader(final FragmentContext fragmentContext, final Path 
inputPath, final DrillFileSystem fileSystem,
-      final List<SchemaPath> columns) throws OutOfMemoryException {
+  public JSONRecordReader(FragmentContext fragmentContext, Path inputPath, 
DrillFileSystem fileSystem,
+      List<SchemaPath> columns) throws OutOfMemoryException {
     this(fragmentContext, inputPath, null, fileSystem, columns);
   }
 
@@ -137,15 +137,15 @@ public class JSONRecordReader extends 
AbstractRecordReader {
   }
 
   @Override
-  public void setup(final OperatorContext context, final OutputMutator output) 
throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
     try{
       if (hadoopPath != null) {
-        this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
+        stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
       }
 
-      this.writer = new VectorContainerWriter(output, unionEnabled);
+      writer = new VectorContainerWriter(output, unionEnabled);
       if (isSkipQuery()) {
-        this.jsonReader = new 
CountingJsonReader(fragmentContext.getManagedBuffer(), enableNanInf, 
enableEscapeAnyChar);
+        jsonReader = new 
CountingJsonReader(fragmentContext.getManagedBuffer(), enableNanInf, 
enableEscapeAnyChar);
       } else {
         this.jsonReader = new 
JsonReader.Builder(fragmentContext.getManagedBuffer())
             .schemaPathColumns(ImmutableList.copyOf(getColumns()))
@@ -157,7 +157,7 @@ public class JSONRecordReader extends AbstractRecordReader {
             .build();
       }
       setupParser();
-    } catch (final Exception e){
+    } catch (Exception e){
       handleAndRaise("Failure reading JSON file", e);
     }
   }
@@ -182,7 +182,7 @@ public class JSONRecordReader extends AbstractRecordReader {
     int columnNr = -1;
 
     if (e instanceof JsonParseException) {
-      final JsonParseException ex = (JsonParseException) e;
+      JsonParseException ex = (JsonParseException) e;
       message = ex.getOriginalMessage();
       columnNr = ex.getLocation().getColumnNr();
     }
@@ -226,7 +226,8 @@ public class JSONRecordReader extends AbstractRecordReader {
           }
           ++parseErrorCount;
           if (printSkippedMalformedJSONRecordLineNumber) {
-            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : 
line nos :" + (recordCount + parseErrorCount));
+            logger.debug("Error parsing JSON in {}: line: {}",
+                hadoopPath.getName(), recordCount + parseErrorCount);
           }
           if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
             break;
@@ -254,8 +255,9 @@ public class JSONRecordReader extends AbstractRecordReader {
 
   @Override
   public void close() throws Exception {
-    if(stream != null) {
+    if (stream != null) {
       stream.close();
+      stream = null;
     }
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 79aa1d3..04bc67d 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -81,6 +81,9 @@ public class TestJsonReader extends BaseTestQuery {
 
   @Test
   public void schemaChange() throws Exception {
+    // Verifies that the schema change does not cause a
+    // crash. A pretty minimal test.
+    // TODO: Verify actual results.
     test("select b from dfs.`vector/complex/writer/schemaChange/`");
   }
 
@@ -267,12 +270,15 @@ public class TestJsonReader extends BaseTestQuery {
 
   @Test
   public void testAllTextMode() throws Exception {
-    test("alter system set `store.json.all_text_mode` = true");
-    String[] queries = {"select * from 
cp.`store/json/schema_change_int_to_string.json`"};
-    long[] rowCounts = {3};
-    String filename = "/store/json/schema_change_int_to_string.json";
-    runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
-    test("alter system set `store.json.all_text_mode` = false");
+    try {
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+      String[] queries = {"select * from 
cp.`store/json/schema_change_int_to_string.json`"};
+      long[] rowCounts = {3};
+      String filename = "/store/json/schema_change_int_to_string.json";
+      runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, 
rowCounts);
+    } finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
@@ -293,58 +299,87 @@ public class TestJsonReader extends BaseTestQuery {
 
   @Test
   public void testNullWhereListExpected() throws Exception {
-    test("alter system set `store.json.all_text_mode` = true");
-    String[] queries = {"select * from 
cp.`store/json/null_where_list_expected.json`"};
-    long[] rowCounts = {3};
-    String filename = "/store/json/null_where_list_expected.json";
-    runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
-    test("alter system set `store.json.all_text_mode` = false");
+    try {
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+      String[] queries = {"select * from 
cp.`store/json/null_where_list_expected.json`"};
+      long[] rowCounts = {3};
+      String filename = "/store/json/null_where_list_expected.json";
+      runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, 
rowCounts);
+    }
+    finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
   public void testNullWhereMapExpected() throws Exception {
-    test("alter system set `store.json.all_text_mode` = true");
-    String[] queries = {"select * from 
cp.`store/json/null_where_map_expected.json`"};
-    long[] rowCounts = {3};
-    String filename = "/store/json/null_where_map_expected.json";
-    runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
-    test("alter system set `store.json.all_text_mode` = false");
+    try {
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+      String[] queries = {"select * from 
cp.`store/json/null_where_map_expected.json`"};
+      long[] rowCounts = {3};
+      String filename = "/store/json/null_where_map_expected.json";
+      runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, 
rowCounts);
+    }
+    finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
   public void ensureProjectionPushdown() throws Exception {
-    // Tests to make sure that we are correctly eliminating schema changing 
columns.  If completes, means that the projection pushdown was successful.
-    test("alter system set `store.json.all_text_mode` = false; "
-        + "select  t.field_1, t.field_3.inner_1, t.field_3.inner_2, 
t.field_4.inner_1 "
-        + "from cp.`store/json/schema_change_int_to_string.json` t");
+    try {
+      // Tests to make sure that we are correctly eliminating schema changing
+      // columns. If completes, means that the projection pushdown was
+      // successful.
+      test("alter system set `store.json.all_text_mode` = false; "
+          + "select  t.field_1, t.field_3.inner_1, t.field_3.inner_2, 
t.field_4.inner_1 "
+          + "from cp.`store/json/schema_change_int_to_string.json` t");
+    } finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
-  // The project pushdown rule is correctly adding the projected columns to 
the scan, however it is not removing
-  // the redundant project operator after the scan, this tests runs a physical 
plan generated from one of the tests to
-  // ensure that the project is filtering out the correct data in the scan 
alone
+  // The project pushdown rule is correctly adding the projected columns to the
+  // scan, however it is not removing the redundant project operator after the
+  // scan, this tests runs a physical plan generated from one of the tests to
+  // ensure that the project is filtering out the correct data in the scan 
alone.
   @Test
   public void testProjectPushdown() throws Exception {
-    String[] queries = 
{Files.asCharSource(DrillFileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"),
 Charsets.UTF_8).read()};
-    long[] rowCounts = {3};
-    String filename = "/store/json/schema_change_int_to_string.json";
-    test("alter system set `store.json.all_text_mode` = false");
-    runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, 
rowCounts);
-
-    List<QueryDataBatch> results = testPhysicalWithResults(queries[0]);
-    assertEquals(1, results.size());
-    // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", 
"`field_4`.`inner_1`"
-
-    RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
-    QueryDataBatch batch = results.get(0);
-    assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
-
-    // this used to be five.  It is now three.  This is because the plan 
doesn't have a project.
-    // Scanners are not responsible for projecting non-existent columns (as 
long as they project one column)
-    assertEquals(3, batchLoader.getSchema().getFieldCount());
-    testExistentColumns(batchLoader);
-
-    batch.release();
-    batchLoader.clear();
+    try {
+      String[] queries = {Files.asCharSource(DrillFileUtils.getResourceAsFile(
+          "/store/json/project_pushdown_json_physical_plan.json"), 
Charsets.UTF_8).read()};
+      String filename = "/store/json/schema_change_int_to_string.json";
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, false);
+      long[] rowCounts = {3};
+      runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, 
rowCounts);
+
+      List<QueryDataBatch> results = testPhysicalWithResults(queries[0]);
+      assertEquals(1, results.size());
+      // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", 
"`field_4`.`inner_1`"
+
+      RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+      QueryDataBatch batch = results.get(0);
+      assertTrue(batchLoader.load(batch.getHeader().getDef(), 
batch.getData()));
+
+      // this used to be five. It is now four. This is because the plan doesn't
+      // have a project. Scanners are not responsible for projecting 
non-existent
+      // columns (as long as they project one column)
+      //
+      // That said, the JSON format plugin does claim it can do project
+      // push-down, which means it will ensure columns for any column
+      // mentioned in the project list, in a form consistent with the schema
+      // path. In this case, `non_existent`.`nested`.`field` appears in
+      // the query. But, even more oddly, the missing field is inserted only
+      // if all text mode is true, omitted if all text mode is false.
+      // Seems overly complex.
+      assertEquals(3, batchLoader.getSchema().getFieldCount());
+      testExistentColumns(batchLoader);
+
+      batch.release();
+      batchLoader.clear();
+    } finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
@@ -360,32 +395,32 @@ public class TestJsonReader extends BaseTestQuery {
 
   private void testExistentColumns(RecordBatchLoader batchLoader) throws 
SchemaChangeException {
     VectorWrapper<?> vw = batchLoader.getValueAccessorById(
-        RepeatedBigIntVector.class, //
-        
batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds()
 //
+        RepeatedBigIntVector.class,
+        
batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds()
     );
     assertEquals("[1]", 
vw.getValueVector().getAccessor().getObject(0).toString());
     assertEquals("[5]", 
vw.getValueVector().getAccessor().getObject(1).toString());
     assertEquals("[5,10,15]", 
vw.getValueVector().getAccessor().getObject(2).toString());
 
     vw = batchLoader.getValueAccessorById(
-        IntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", 
"inner_1")).getFieldIds() //
+        IntVector.class,
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", 
"inner_1")).getFieldIds()
     );
     assertNull(vw.getValueVector().getAccessor().getObject(0));
     assertEquals(2l, vw.getValueVector().getAccessor().getObject(1));
     assertEquals(5l, vw.getValueVector().getAccessor().getObject(2));
 
     vw = batchLoader.getValueAccessorById(
-        IntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", 
"inner_2")).getFieldIds() //
+        IntVector.class,
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", 
"inner_2")).getFieldIds()
     );
     assertNull(vw.getValueVector().getAccessor().getObject(0));
     assertNull(vw.getValueVector().getAccessor().getObject(1));
     assertEquals(3l, vw.getValueVector().getAccessor().getObject(2));
 
     vw = batchLoader.getValueAccessorById(
-        RepeatedBigIntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", 
"inner_1")).getFieldIds() //
+        RepeatedBigIntVector.class,
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", 
"inner_1")).getFieldIds()
     );
     assertEquals("[]", 
vw.getValueVector().getAccessor().getObject(0).toString());
     assertEquals("[1,2,3]", 
vw.getValueVector().getAccessor().getObject(1).toString());
@@ -440,7 +475,7 @@ public class TestJsonReader extends BaseTestQuery {
                       )
               ).go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -457,7 +492,7 @@ public class TestJsonReader extends BaseTestQuery {
               .baselineValues(13L, "BIGINT")
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -477,7 +512,7 @@ public class TestJsonReader extends BaseTestQuery {
               .baselineValues(3L)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -495,7 +530,7 @@ public class TestJsonReader extends BaseTestQuery {
               .baselineValues(9L)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -512,7 +547,7 @@ public class TestJsonReader extends BaseTestQuery {
               .baselineValues(11.0)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -536,7 +571,7 @@ public class TestJsonReader extends BaseTestQuery {
               .baselineValues(20000L)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -565,7 +600,7 @@ public class TestJsonReader extends BaseTestQuery {
               .baselineValues(20000L)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -628,7 +663,7 @@ public class TestJsonReader extends BaseTestQuery {
         .go();
 
     } finally {
-      testNoResult("alter session set `store.json.all_text_mode` = false");
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
     }
   }
 

Reply via email to