DRILL-5993: Used generic copiers in the selection vector remover, and 
implemented testing improvements for RowSets and codegen.

closes #1057


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/55c2bea0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/55c2bea0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/55c2bea0

Branch: refs/heads/master
Commit: 55c2bea04c28d22e58ef055ee35947c1b6cec21c
Parents: c8fdfd6
Author: Timothy Farkas <[email protected]>
Authored: Thu Nov 30 09:55:32 2017 -0800
Committer: Ben-Zvi <[email protected]>
Committed: Fri Feb 2 17:47:11 2018 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/TopN/TopNBatch.java      |   8 +-
 .../physical/impl/svremover/AbstractCopier.java |  95 ++++++++++++
 .../impl/svremover/AbstractSV2Copier.java       |  53 +++++++
 .../impl/svremover/AbstractSV4Copier.java       |  53 +++++++
 .../exec/physical/impl/svremover/Copier.java    |  12 +-
 .../impl/svremover/CopierTemplate2.java         |  71 ---------
 .../impl/svremover/CopierTemplate4.java         |  72 ---------
 .../physical/impl/svremover/GenericCopier.java  |  96 ++++++++++++
 .../impl/svremover/GenericSV2Copier.java        |  31 +---
 .../impl/svremover/GenericSV4Copier.java        |  42 ++----
 .../impl/svremover/RemovingRecordBatch.java     | 148 ++++---------------
 .../exec/record/ExpandableHyperContainer.java   |   4 +
 .../apache/drill/exec/record/RecordBatch.java   |  31 ++--
 .../drill/exec/record/VectorAccessible.java     |  14 +-
 .../svremover/AbstractGenericCopierTest.java    | 136 +++++++++++++++++
 .../impl/svremover/GenericCopierTest.java       |  41 +++++
 .../impl/svremover/GenericSV2CopierTest.java    |  41 +++++
 .../impl/svremover/GenericSV4CopierTest.java    |  65 ++++++++
 .../drill/exec/record/TestVectorContainer.java  |   3 +-
 .../apache/drill/test/BaseDirTestWatcher.java   |  10 ++
 .../org/apache/drill/test/BaseTestQuery.java    |   5 +
 .../org/apache/drill/test/ClusterFixture.java   |   2 +
 .../apache/drill/test/rowSet/DirectRowSet.java  |  11 +-
 .../drill/test/rowSet/IndirectRowSet.java       |  32 +++-
 .../org/apache/drill/test/rowSet/RowSet.java    |   3 +
 .../apache/drill/test/rowSet/RowSetBatch.java   | 108 ++++++++++++++
 .../apache/drill/test/rowSet/RowSetBuilder.java |  22 ++-
 27 files changed, 839 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 1683286..3929714 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -47,7 +47,7 @@ import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
 import org.apache.drill.exec.physical.impl.svremover.Copier;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.physical.impl.svremover.GenericSV4Copier;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -295,7 +295,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, 
context);
     SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, 
null, context);
     if (copier == null) {
-      copier = RemovingRecordBatch.getGenerated4Copier(batch, context, 
newContainer, newBatch, null);
+      copier = GenericSV4Copier.createCopier(batch, newContainer, null);
     } else {
       for (VectorWrapper<?> i : batch) {
 
@@ -303,7 +303,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         ValueVector v = TypeHelper.getNewVector(i.getField(), 
oContext.getAllocator());
         newContainer.add(v);
       }
-      copier.setupRemover(context, batch, newBatch);
+      copier.setup(batch, newContainer);
     }
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new 
SortRecordBatchBuilder(oContext.getAllocator());
@@ -415,7 +415,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, 
selectionVector4, context);
     final SimpleSV4RecordBatch newBatch = new 
SimpleSV4RecordBatch(newContainer, null, context);
-    copier = RemovingRecordBatch.getGenerated4Copier(batch, context,  
newContainer, newBatch, null);
+    copier = GenericSV4Copier.createCopier(batch, newContainer, null);
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new 
SortRecordBatchBuilder(oContext.getAllocator());
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
new file mode 100644
index 0000000..ddea468
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class AbstractCopier implements Copier {
+  protected ValueVector[] vvOut;
+  protected VectorContainer outgoing;
+
+  @Override
+  public void setup(RecordBatch incoming, VectorContainer outgoing) throws 
SchemaChangeException {
+    this.outgoing = outgoing;
+
+    final int count = outgoing.getNumberOfColumns();
+    vvOut = new ValueVector[count];
+
+    for (int index = 0; index < count; index++) {
+      vvOut[index] = outgoing.getValueVector(index).getValueVector();
+    }
+  }
+
+  @Override
+  public int copyRecords(int index, int recordCount) throws 
SchemaChangeException {
+    for(VectorWrapper<?> out : outgoing){
+      TypeProtos.MajorType type = out.getField().getType();
+      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+        out.getValueVector().allocateNew();
+      } else {
+        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
+      }
+    }
+
+    return insertRecords(0, index, recordCount);
+  }
+
+  @Override
+  public int appendRecord(int index) throws SchemaChangeException {
+    int outgoingPosition = outgoing.getRecordCount();
+    copyEntryIndirect(index, outgoingPosition);
+    outgoingPosition++;
+    updateCounts(outgoingPosition);
+    return outgoingPosition;
+  }
+
+  @Override
+  public int appendRecords(int index, int recordCount) throws 
SchemaChangeException {
+    return insertRecords(outgoing.getRecordCount(), index, recordCount);
+  }
+
+  private int insertRecords(int outgoingPosition, int index, int recordCount) 
throws SchemaChangeException {
+    final int endIndex = index + recordCount;
+
+    for(int svIndex = index; svIndex < endIndex; svIndex++, 
outgoingPosition++){
+      copyEntryIndirect(svIndex, outgoingPosition);
+    }
+
+    updateCounts(outgoingPosition);
+    return outgoingPosition;
+  }
+
+  private void updateCounts(int numRecords) {
+    outgoing.setRecordCount(numRecords);
+
+    for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) {
+      vvOut[vectorIndex].getMutator().setValueCount(numRecords);
+    }
+  }
+
+  public abstract void copyEntryIndirect(int inIndex, int outIndex) throws 
SchemaChangeException;
+
+  public abstract void copyEntry(int inIndex, int outIndex) throws 
SchemaChangeException;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
new file mode 100644
index 0000000..d9f1c8e
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class AbstractSV2Copier extends AbstractCopier {
+  protected ValueVector[] vvIn;
+  private SelectionVector2 sv2;
+
+  @Override
+  public void setup(RecordBatch incoming, VectorContainer outgoing) throws 
SchemaChangeException {
+    super.setup(incoming, outgoing);
+    this.sv2 = incoming.getSelectionVector2();
+
+    final int count = outgoing.getNumberOfColumns();
+
+    vvIn = new ValueVector[count];
+
+    {
+      int index = 0;
+
+      for (VectorWrapper vectorWrapper: incoming) {
+        vvIn[index] = vectorWrapper.getValueVector();
+        index++;
+      }
+    }
+  }
+
+  public void copyEntryIndirect(int inIndex, int outIndex) throws 
SchemaChangeException {
+    copyEntry(sv2.getIndex(inIndex), outIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
new file mode 100644
index 0000000..4f3afc3
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class AbstractSV4Copier extends AbstractCopier {
+  protected ValueVector[][] vvIn;
+  private SelectionVector4 sv4;
+
+  @Override
+  public void setup(RecordBatch incoming, VectorContainer outgoing) throws 
SchemaChangeException{
+    super.setup(incoming, outgoing);
+    this.sv4 = incoming.getSelectionVector4();
+
+    final int count = outgoing.getNumberOfColumns();
+
+    vvIn = new ValueVector[count][];
+
+    {
+      int index = 0;
+
+      for (VectorWrapper vectorWrapper: incoming) {
+        vvIn[index] = vectorWrapper.getValueVectors();
+        index++;
+      }
+    }
+  }
+
+  public void copyEntryIndirect(int inIndex, int outIndex) throws 
SchemaChangeException {
+    copyEntry(sv4.get(inIndex), outIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 9e265d7..bc31252 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -17,15 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
 
 public interface Copier {
-  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new 
TemplateClassDefinition<Copier>(Copier.class, CopierTemplate2.class);
-  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new 
TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class);
-
-  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing) throws SchemaChangeException;
-  public abstract int copyRecords(int index, int recordCount) throws 
SchemaChangeException;
+  void setup(RecordBatch incoming, VectorContainer outgoing) throws 
SchemaChangeException;
+  int copyRecords(int index, int recordCount) throws SchemaChangeException;
+  int appendRecord(int index) throws SchemaChangeException;
+  int appendRecords(int index, int recordCount) throws SchemaChangeException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
deleted file mode 100644
index 96daf7f..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.svremover;
-
-import javax.inject.Named;
-
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.AllocationHelper;
-
-
-public abstract class CopierTemplate2 implements Copier{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
-
-  private SelectionVector2 sv2;
-  private RecordBatch outgoing;
-
-  @Override
-  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing) throws SchemaChangeException{
-    this.sv2 = incoming.getSelectionVector2();
-    this.outgoing = outgoing;
-    doSetup(context, incoming, outgoing);
-  }
-
-  @Override
-  public int copyRecords(int index, int recordCount) throws 
SchemaChangeException {
-    for(VectorWrapper<?> out : outgoing){
-      MajorType type = out.getField().getType();
-      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
-        out.getValueVector().allocateNew();
-      } else {
-        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
-      }
-    }
-
-    int outgoingPosition = 0;
-
-    for(int svIndex = index; svIndex < index + recordCount; svIndex++, 
outgoingPosition++){
-      doEval(sv2.getIndex(svIndex), outgoingPosition);
-    }
-    return outgoingPosition;
-  }
-
-  public abstract void doSetup(@Named("context") FragmentContext context,
-                               @Named("incoming") RecordBatch incoming,
-                               @Named("outgoing") RecordBatch outgoing)
-                       throws SchemaChangeException;
-  public abstract void doEval(@Named("inIndex") int inIndex,
-                              @Named("outIndex") int outIndex)
-                       throws SchemaChangeException;
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
deleted file mode 100644
index 1ae7df9..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.svremover;
-
-import javax.inject.Named;
-
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.AllocationHelper;
-
-public abstract class CopierTemplate4 implements Copier{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
-
-  private SelectionVector4 sv4;
-  private RecordBatch outgoing;
-
-
-  @Override
-  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing) throws SchemaChangeException{
-    this.outgoing = outgoing;
-    this.sv4 = incoming.getSelectionVector4();
-    doSetup(context, incoming, outgoing);
-  }
-
-
-  @Override
-  public int copyRecords(int index, int recordCount) throws 
SchemaChangeException {
-    for(VectorWrapper<?> out : outgoing){
-      MajorType type = out.getField().getType();
-      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
-        out.getValueVector().allocateNew();
-      } else {
-        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
-      }
-    }
-
-    int outgoingPosition = 0;
-    for(int svIndex = index; svIndex < index + recordCount; svIndex++, 
outgoingPosition++){
-      int deRefIndex = sv4.get(svIndex);
-      doEval(deRefIndex, outgoingPosition);
-    }
-    return outgoingPosition;
-  }
-
-  public abstract void doSetup(@Named("context") FragmentContext context,
-                               @Named("incoming") RecordBatch incoming,
-                               @Named("outgoing") RecordBatch outgoing)
-                       throws SchemaChangeException;
-  public abstract void doEval(@Named("inIndex") int inIndex,
-                              @Named("outIndex") int outIndex)
-                       throws SchemaChangeException;
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
new file mode 100644
index 0000000..de048dc
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class GenericCopier implements Copier {
+  private ValueVector[] vvOut;
+  private ValueVector[] vvIn;
+
+  private VectorContainer outgoing;
+
+  @Override
+  public void setup(RecordBatch incoming, VectorContainer outgoing) throws 
SchemaChangeException {
+    this.outgoing = outgoing;
+
+    final int count = outgoing.getNumberOfColumns();
+
+    vvIn = new ValueVector[count];
+    vvOut = new ValueVector[count];
+
+    {
+      int index = 0;
+
+      for (VectorWrapper vectorWrapper: incoming) {
+        vvIn[index] = vectorWrapper.getValueVector();
+        index++;
+      }
+    }
+
+    for (int index = 0; index < count; index++) {
+      vvOut[index] = outgoing.getValueVector(index).getValueVector();
+    }
+  }
+
+  @Override
+  public int copyRecords(int index, int recordCount) throws 
SchemaChangeException {
+    return insertRecords(0, index, recordCount);
+  }
+
+  @Override
+  public int appendRecord(int index) throws SchemaChangeException {
+    int outgoingPosition = outgoing.getRecordCount();
+    for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) {
+      vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index);
+    }
+    outgoingPosition++;
+    updateCounts(outgoingPosition);
+    return outgoingPosition;
+  }
+
+  @Override
+  public int appendRecords(int index, int recordCount) throws 
SchemaChangeException {
+    return insertRecords(outgoing.getRecordCount(), index, recordCount);
+  }
+
+  private int insertRecords(int outgoingPosition, int startIndex, int 
recordCount) throws SchemaChangeException {
+    final int endIndex = startIndex + recordCount;
+
+    for (int index = startIndex; index < endIndex; index++, 
outgoingPosition++) {
+      for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) {
+        vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], 
index);
+      }
+    }
+
+    updateCounts(outgoingPosition);
+    return outgoingPosition;
+  }
+
+  private void updateCounts(int numRecords) {
+    outgoing.setRecordCount(numRecords);
+
+    for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) {
+      vvOut[vectorIndex].getMutator().setValueCount(numRecords);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
index 2fc17a3..a375f45 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
@@ -18,37 +18,10 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class GenericSV2Copier extends CopierTemplate2 {
-
-  private ValueVector[] vvOut;
-  private ValueVector[] vvIn;
-
-  @SuppressWarnings("unused")
-  @Override
-  public void doSetup(FragmentContext context, RecordBatch incoming,
-                      RecordBatch outgoing) throws SchemaChangeException {
-
-    int count = 0;
-    for(VectorWrapper<?> vv : incoming) {
-      count++;
-    }
-    vvIn = new ValueVector[count];
-    vvOut = new ValueVector[count];
-    int i = 0;
-    for(VectorWrapper<?> vv : incoming) {
-      vvIn[i] = incoming.getValueAccessorById(ValueVector.class, 
i).getValueVector();
-      vvOut[i] = outgoing.getValueAccessorById(ValueVector.class, 
i).getValueVector();
-      i++;
-    }
-  }
 
+public class GenericSV2Copier extends AbstractSV2Copier {
   @Override
-  public void doEval(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntry(int inIndex, int outIndex) throws 
SchemaChangeException {
     for ( int i = 0;  i < vvIn.length;  i++ ) {
       vvOut[i].copyEntry(outIndex, vvIn[i], inIndex);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
index 0950791..f9b153d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
@@ -18,41 +18,31 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
-public class GenericSV4Copier extends CopierTemplate4 {
-
-  private ValueVector[] vvOut;
-  private ValueVector[][] vvIn;
-
-  @SuppressWarnings("unused")
-  @Override
-  public void doSetup(FragmentContext context, RecordBatch incoming,
-                      RecordBatch outgoing) throws SchemaChangeException {
-
-    int count = 0;
-    for(VectorWrapper<?> vv : incoming) {
-      count++;
-    }
-    vvIn = new ValueVector[count][];
-    vvOut = new ValueVector[count];
-    int i = 0;
-    for(VectorWrapper<?> vv : incoming) {
-      vvIn[i] = incoming.getValueAccessorById(ValueVector.class, 
i).getValueVectors();
-      vvOut[i] = outgoing.getValueAccessorById(ValueVector.class, 
i).getValueVector();
-      i++;
-    }
-  }
-
+public class GenericSV4Copier extends AbstractSV4Copier {
   @Override
-  public void doEval(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntry(int inIndex, int outIndex) throws 
SchemaChangeException {
     int inOffset = inIndex & 0xFFFF;
     int inVector = inIndex >>> 16;
     for ( int i = 0;  i < vvIn.length;  i++ ) {
       vvOut[i].copyEntry(outIndex, vvIn[i][inVector], inOffset);
     }
   }
+
+  public static Copier createCopier(RecordBatch batch, VectorContainer 
container, SchemaChangeCallBack callBack) throws SchemaChangeException {
+    for(VectorWrapper<?> vv : batch){
+      @SuppressWarnings("resource")
+      ValueVector v = vv.getValueVectors()[0];
+      v.makeTransferPair(container.addOrGet(v.getField(), callBack));
+    }
+
+    Copier copier = new GenericSV4Copier();
+    copier.setup(batch, container);
+    return copier;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 66fe261..08ca029 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -17,10 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import java.io.IOException;
 import java.util.List;
 
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
@@ -33,9 +31,6 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.vector.CopyUtil;
-import org.apache.drill.exec.vector.SchemaChangeCallBack;
-import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -44,9 +39,6 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
 
   private Copier copier;
-  private int recordCount;
-  private boolean hasRemainder;
-  private int remainderIndex;
 
   public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext 
context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, incoming);
@@ -55,7 +47,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
 
   @Override
   public int getRecordCount() {
-    return recordCount;
+    return container.getRecordCount();
   }
 
   @Override
@@ -66,10 +58,10 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
       this.copier = getStraightCopier();
       break;
     case TWO_BYTE:
-      this.copier = getGenerated2Copier();
+      this.copier = create2Copier();
       break;
     case FOUR_BYTE:
-      this.copier = getGenerated4Copier();
+      this.copier = create4Copier();
       break;
     default:
       throw new UnsupportedOperationException();
@@ -85,37 +77,16 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
 
   @Override
   public IterOutcome innerNext() {
-    if (hasRemainder) {
-      handleRemainder();
-      return IterOutcome.OK;
-    }
     return super.innerNext();
   }
 
   @Override
   protected IterOutcome doWork() {
-    int incomingRecordCount = incoming.getRecordCount();
-    int copiedRecords;
     try {
-      copiedRecords = copier.copyRecords(0, incomingRecordCount);
+      copier.copyRecords(0, incoming.getRecordCount());
     } catch (SchemaChangeException e) {
       throw new IllegalStateException(e);
-    }
-
-    if (copiedRecords < incomingRecordCount) {
-      for(VectorWrapper<?> v : container){
-        ValueVector.Mutator m = v.getValueVector().getMutator();
-        m.setValueCount(copiedRecords);
-      }
-      hasRemainder = true;
-      remainderIndex = copiedRecords;
-      this.recordCount = remainderIndex;
-    } else {
-      recordCount = copiedRecords;
-      for(VectorWrapper<?> v : container){
-        ValueVector.Mutator m = v.getValueVector().getMutator();
-        m.setValueCount(recordCount);
-      }
+    } finally {
       if (incoming.getSchema().getSelectionVectorMode() != 
SelectionVectorMode.FOUR_BYTE) {
         for(VectorWrapper<?> v: incoming) {
           v.clear();
@@ -126,56 +97,11 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
       }
     }
 
-    assert recordCount >= copiedRecords;
-    logger.debug("doWork(): {} records copied out of {}, remaining: {}, 
incoming schema {} ",
-        copiedRecords,
-        incomingRecordCount,
-        incomingRecordCount - remainderIndex,
-        incoming.getSchema());
+    logger.debug("doWork(): {} records copied out of {}, incoming schema {} ",
+      container.getRecordCount(), container.getRecordCount(), 
incoming.getSchema());
     return IterOutcome.OK;
   }
 
-  private void handleRemainder() {
-    int recordCount = incoming.getRecordCount();
-    int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
-    int copiedRecords;
-    try {
-      while((copiedRecords = copier.copyRecords(remainderIndex, 
remainingRecordCount)) == 0) {
-        logger.debug("Copied zero records. Retrying");
-        container.zeroVectors();
-      }
-    } catch (SchemaChangeException e) {
-      throw new IllegalStateException(e);
-    }
-
-    if (copiedRecords < remainingRecordCount) {
-      for(VectorWrapper<?> v : container){
-        ValueVector.Mutator m = v.getValueVector().getMutator();
-        m.setValueCount(copiedRecords);
-      }
-      remainderIndex += copiedRecords;
-      this.recordCount = copiedRecords;
-    } else {
-      for(VectorWrapper<?> v : container){
-        ValueVector.Mutator m = v.getValueVector().getMutator();
-        m.setValueCount(remainingRecordCount);
-        this.recordCount = remainingRecordCount;
-      }
-      if (incoming.getSchema().getSelectionVectorMode() != 
SelectionVectorMode.FOUR_BYTE) {
-        for(VectorWrapper<?> v: incoming) {
-          v.clear();
-        }
-      }
-      remainderIndex = 0;
-      hasRemainder = false;
-    }
-    logger.debug(String.format("handleRemainder(): %s records copied out of 
%s, remaining: %s, incoming schema %s ",
-        copiedRecords,
-        recordCount,
-        recordCount - remainderIndex,
-        incoming.getSchema()));
-  }
-
   @Override
   public void close() {
     super.close();
@@ -186,7 +112,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
     private List<TransferPair> pairs = Lists.newArrayList();
 
     @Override
-    public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing){
+    public void setup(RecordBatch incoming, VectorContainer outgoing){
       for(VectorWrapper<?> vv : incoming){
         TransferPair tp = 
vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), 
callBack));
         pairs.add(tp);
@@ -199,65 +125,43 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
       for(TransferPair tp : pairs){
         tp.transfer();
       }
+
+      container.setRecordCount(incoming.getRecordCount());
       return recordCount;
     }
 
+    @Override
+    public int appendRecord(int index) throws SchemaChangeException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int appendRecords(int index, int recordCount) throws 
SchemaChangeException {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private Copier getStraightCopier(){
     StraightCopier copier = new StraightCopier();
-    copier.setupRemover(context, incoming, this);
+    copier.setup(incoming, container);
     return copier;
   }
 
-  private Copier getGenerated2Copier() throws SchemaChangeException{
+  private Copier create2Copier() throws SchemaChangeException {
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() 
== SelectionVectorMode.TWO_BYTE);
 
     for(VectorWrapper<?> vv : incoming){
       vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), 
callBack));
     }
 
-    try {
-      final CodeGenerator<Copier> cg = 
CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getOptions());
-      CopyUtil.generateCopies(cg.getRoot(), incoming, false);
-      Copier copier = context.getImplementationClass(cg);
-      copier.setupRemover(context, incoming, this);
-      cg.plainJavaCapable(true);
-      // Uncomment out this line to debug the generated code.
-//      cg.saveCodeForDebugging(true);
-
-      return copier;
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
-    }
+    Copier copier = new GenericSV2Copier();
+    copier.setup(incoming, container);
+    return copier;
   }
 
-  private Copier getGenerated4Copier() throws SchemaChangeException {
+  private Copier create4Copier() throws SchemaChangeException {
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() 
== SelectionVectorMode.FOUR_BYTE);
-    return getGenerated4Copier(incoming, context, container, this, callBack);
-  }
-
-  public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext 
context, VectorContainer container, RecordBatch outgoing,
-                                           SchemaChangeCallBack callBack) 
throws SchemaChangeException{
-
-    for(VectorWrapper<?> vv : batch){
-      @SuppressWarnings("resource")
-      ValueVector v = vv.getValueVectors()[0];
-      v.makeTransferPair(container.addOrGet(v.getField(), callBack));
-    }
-
-    try {
-      final CodeGenerator<Copier> cg = 
CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getOptions());
-      CopyUtil.generateCopies(cg.getRoot(), batch, true);
-      cg.plainJavaCapable(true);
-      // Uncomment out this line to debug the generated code.
-//      cg.saveCodeForDebugging(true);
-      Copier copier = context.getImplementationClass(cg);
-      copier.setupRemover(context, batch, outgoing);
-      return copier;
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
-    }
+    return GenericSV4Copier.createCopier(incoming, container, callBack);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
index 9037340..13656a5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
@@ -42,6 +42,8 @@ public class ExpandableHyperContainer extends VectorContainer 
{
         this.add(hyperVector, true);
       }
     }
+
+    buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
   }
 
   public void addBatch(VectorAccessible batch) {
@@ -62,5 +64,7 @@ public class ExpandableHyperContainer extends VectorContainer 
{
         hyperVectorWrapper.addVector(w.getValueVector());
       }
     }
+
+    buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index acb7a9b..7fc086d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.vector.ValueVector;
 public interface RecordBatch extends VectorAccessible {
 
   /** max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */
-  public static final int MAX_BATCH_SIZE = ValueVector.MAX_ROW_COUNT;
+  int MAX_BATCH_SIZE = ValueVector.MAX_ROW_COUNT;
 
   /**
    * Describes the outcome of incrementing RecordBatch forward by a call to
@@ -102,7 +102,7 @@ public interface RecordBatch extends VectorAccessible {
    *   )
    * </p>
    */
-  public static enum IterOutcome {
+  enum IterOutcome {
     /**
      * Normal completion of batch.
      * <p>
@@ -205,35 +205,29 @@ public interface RecordBatch extends VectorAccessible {
    * Gets the FragmentContext of the current query fragment.  Useful for
    * reporting failure information or other query-level information.
    */
-  public FragmentContext getContext();
+  FragmentContext getContext();
 
   /**
    * Gets the current schema of this record batch.
    * <p>
    *   May be called only when the most recent call to {@link #next}, if any,
-   *   returned {@link #OK_NEW_SCHEMA} or {@link #OK}.
+   *   returned {@link IterOutcome#OK_NEW_SCHEMA} or {@link IterOutcome#OK}.
    * </p>
    * <p>
    *   The schema changes when and only when {@link #next} returns
-   *   {@link #OK_NEW_SCHEMA}.
+   *   {@link IterOutcome#OK_NEW_SCHEMA}.
    * </p>
    */
   @Override
-  public BatchSchema getSchema();
-
-  /**
-   * Gets the number of records that are within this record.
-   */
-  @Override
-  public int getRecordCount();
+  BatchSchema getSchema();
 
   /**
    * Informs child nodes that this query should be terminated.  Child nodes
    * should use the QueryContext to determine what has happened.
    */
-  public void kill(boolean sendUpstream);
+  void kill(boolean sendUpstream);
 
-  public VectorContainer getOutgoingContainer();
+  VectorContainer getOutgoingContainer();
 
   /**
    * Gets the value vector type and ID for the given schema path.  The
@@ -247,10 +241,10 @@ public interface RecordBatch extends VectorAccessible {
    *         TypedFieldId
    */
   @Override
-  public abstract TypedFieldId getValueVectorId(SchemaPath path);
+  TypedFieldId getValueVectorId(SchemaPath path);
 
   @Override
-  public abstract VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... 
ids);
+  VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids);
 
   /**
    * Updates the data in each Field reading interface for the next range of
@@ -269,12 +263,11 @@ public interface RecordBatch extends VectorAccessible {
    *
    * @return An IterOutcome describing the result of the iteration.
    */
-  public IterOutcome next();
+  IterOutcome next();
 
   /**
    * Gets a writable version of this batch.  Takes over ownership of existing
    * buffers.
    */
-  public WritableBatch getWritableBatch();
-
+  WritableBatch getWritableBatch();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index f1a250c..acf94e1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -25,7 +25,7 @@ import 
org.apache.drill.exec.record.selection.SelectionVector4;
 public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
   // TODO are these <?> releated in any way? Should they be the same one?
   // TODO javadoc
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... 
fieldIds);
+  VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
 
   /**
    * Get the value vector type and id for the given schema path. The 
TypedFieldId
@@ -36,7 +36,7 @@ public interface VectorAccessible extends 
Iterable<VectorWrapper<?>> {
    * @return the local field id associated with this vector. If no field 
matches this
    *   path, this will return a null TypedFieldId
    */
-  public TypedFieldId getValueVectorId(SchemaPath path);
+  TypedFieldId getValueVectorId(SchemaPath path);
 
   /**
    * Get the schema of the current RecordBatch. This changes if and only if a 
*_NEW_SCHEMA
@@ -44,18 +44,16 @@ public interface VectorAccessible extends 
Iterable<VectorWrapper<?>> {
    *
    * @return schema of the current batch
    */
-  public BatchSchema getSchema();
+  BatchSchema getSchema();
 
   /**
    * Get the number of records.
    *
    * @return number of records
    */
-  public int getRecordCount();
-
-  public abstract SelectionVector2 getSelectionVector2();
-
-  public abstract SelectionVector4 getSelectionVector4();
+  int getRecordCount();
 
+  SelectionVector2 getSelectionVector2();
 
+  SelectionVector4 getSelectionVector4();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
new file mode 100644
index 0000000..01263b1
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBatch;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Test;
+
+import java.util.List;
+
+public abstract class AbstractGenericCopierTest {
+  @Test
+  public void testCopyRecords() throws SchemaChangeException {
+    try (RootAllocator allocator = new RootAllocator(10_000_000)) {
+      final BatchSchema batchSchema = 
createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+      final RowSet srcRowSet = createSrcRowSet(allocator);
+      final RowSet destRowSet = new RowSetBuilder(allocator, 
batchSchema).build();
+      final VectorContainer destContainer = destRowSet.container();
+      final Copier copier = createCopier();
+      final RowSet expectedRowSet = createExpectedRowset(allocator);
+
+      copier.setup(new RowSetBatch(srcRowSet), destContainer);
+      copier.copyRecords(0, 3);
+
+      try {
+        new RowSetComparison(expectedRowSet).verify(destRowSet);
+      } finally {
+        srcRowSet.clear();
+
+        if (srcRowSet instanceof RowSet.HyperRowSet) {
+          ((RowSet.HyperRowSet)srcRowSet).getSv4().clear();
+        }
+
+        destRowSet.clear();
+        expectedRowSet.clear();
+      }
+    }
+  }
+
+  @Test
+  public void testAppendRecords() throws SchemaChangeException {
+    try (RootAllocator allocator = new RootAllocator(10_000_000)) {
+      final BatchSchema batchSchema = 
createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+      final RowSet srcRowSet = createSrcRowSet(allocator);
+      final RowSet destRowSet = new RowSetBuilder(allocator, 
batchSchema).build();
+      final VectorContainer destContainer = destRowSet.container();
+      final Copier copier = createCopier();
+      final RowSet expectedRowSet = createExpectedRowset(allocator);
+
+      copier.setup(new RowSetBatch(srcRowSet), destContainer);
+      copier.appendRecord(0);
+      copier.appendRecords(1, 2);
+
+      try {
+        new RowSetComparison(expectedRowSet).verify(destRowSet);
+      } finally {
+        srcRowSet.clear();
+
+        if (srcRowSet instanceof RowSet.HyperRowSet) {
+          ((RowSet.HyperRowSet)srcRowSet).getSv4().clear();
+        }
+
+        destRowSet.clear();
+        expectedRowSet.clear();
+      }
+    }
+  }
+
+  public abstract RowSet createSrcRowSet(RootAllocator allocator) throws 
SchemaChangeException;
+
+  public abstract Copier createCopier();
+
+  public static Object[] row1() {
+    return new Object[]{110, "green", new float[]{5.5f, 2.3f}, new 
String[]{"1a", "1b"}};
+  }
+
+  public static Object[] row2() {
+    return new Object[]{109, "blue", new float[]{1.5f}, new String[]{"2a"}};
+  }
+
+  public static Object[] row3() {
+    return new Object[]{108, "red", new float[]{-11.1f, 0.0f, .5f}, new 
String[]{"3a", "3b", "3c"}};
+  }
+
+  public static Object[] row4() {
+    return new Object[]{107, "yellow", new float[]{4.25f, 1.25f}, new 
String[]{}};
+  }
+
+  public static Object[] row5() {
+    return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}};
+  }
+
+  public static RowSet createExpectedRowset(RootAllocator allocator) {
+    return new RowSetBuilder(allocator, 
createTestSchema(BatchSchema.SelectionVectorMode.NONE))
+      .addRow(row1())
+      .addRow(row2())
+      .addRow(row3())
+      .build();
+  }
+
+  public static BatchSchema createTestSchema(BatchSchema.SelectionVectorMode 
mode) {
+    MaterializedField colA = MaterializedField.create("colA", 
Types.required(TypeProtos.MinorType.INT));
+    MaterializedField colB = MaterializedField.create("colB", 
Types.required(TypeProtos.MinorType.VARCHAR));
+    MaterializedField colC = MaterializedField.create("colC", 
Types.repeated(TypeProtos.MinorType.FLOAT4));
+    MaterializedField colD = MaterializedField.create("colD", 
Types.repeated(TypeProtos.MinorType.VARCHAR));
+
+    List<MaterializedField> cols = Lists.newArrayList(colA, colB, colC, colD);
+    BatchSchema batchSchema = new BatchSchema(mode, cols);
+    return batchSchema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
new file mode 100644
index 0000000..f946f81
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+
+public class GenericCopierTest extends AbstractGenericCopierTest {
+  @Override
+  public RowSet createSrcRowSet(RootAllocator allocator) {
+    return new RowSetBuilder(allocator, 
createTestSchema(BatchSchema.SelectionVectorMode.NONE))
+      .addRow(row1())
+      .addRow(row2())
+      .addRow(row3())
+      .addRow(row4())
+      .addRow(row5())
+      .build();
+  }
+
+  @Override
+  public Copier createCopier() {
+    return new GenericCopier();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
new file mode 100644
index 0000000..428124d
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+public class GenericSV2CopierTest extends AbstractGenericCopierTest {
+  @Override
+  public RowSet createSrcRowSet(RootAllocator allocator) {
+    return new RowSetBuilder(allocator, 
createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
+      .addRow(row1())
+      .addSelection(false, row4())
+      .addRow(row2())
+      .addSelection(false, row5())
+      .addRow(row3())
+      .withSv2()
+      .build();
+  }
+
+  @Override
+  public Copier createCopier() {
+    return new GenericSV2Copier();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
new file mode 100644
index 0000000..447ad3a
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.HyperRowSetImpl;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+
+public class GenericSV4CopierTest extends AbstractGenericCopierTest {
+  @Override
+  public RowSet createSrcRowSet(RootAllocator allocator) throws 
SchemaChangeException {
+    final BatchSchema batchSchema = 
createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+    final DrillBuf drillBuf = allocator.buffer(4 * 3);
+    final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, 
Character.MAX_VALUE);
+
+    final VectorContainer batch1 = new RowSetBuilder(allocator, batchSchema)
+      .addRow(row1())
+      .addRow(row4())
+      .build()
+      .container();
+
+    final VectorContainer batch2 = new RowSetBuilder(allocator, batchSchema)
+      .addRow(row2())
+      .addRow(row5())
+      .addRow(row3())
+      .build()
+      .container();
+
+    final ExpandableHyperContainer hyperContainer = new 
ExpandableHyperContainer(batch1);
+    hyperContainer.addBatch(batch2);
+
+    sv4.set(0, 0, 0);
+    sv4.set(1, 1, 0);
+    sv4.set(2, 1, 2);
+
+    return new HyperRowSetImpl(hyperContainer, sv4);
+  }
+
+  @Override
+  public Copier createCopier() {
+    return new GenericSV4Copier();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
index ef79780..c6c18d5 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
@@ -38,8 +38,7 @@ public class TestVectorContainer extends DrillTest {
 
   // TODO: Replace the following with an extension of SubOperatorTest class
   // once that is available.
-
-  protected volatile static OperatorFixture fixture;
+  protected static OperatorFixture fixture;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
index b595869..d36423b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
@@ -53,6 +53,7 @@ public class BaseDirTestWatcher extends DirTestWatcher {
     TEST_TMP // Corresponds to the directory that should be mapped to dfs.tmp
   }
 
+  private File codegenDir;
   private File tmpDir;
   private File storeDir;
   private File dfsTestTmpParentDir;
@@ -78,6 +79,7 @@ public class BaseDirTestWatcher extends DirTestWatcher {
   protected void starting(Description description) {
     super.starting(description);
 
+    codegenDir = makeSubDir(Paths.get("codegen"));
     rootDir = makeSubDir(Paths.get("root"));
     tmpDir = makeSubDir(Paths.get("tmp"));
     storeDir = makeSubDir(Paths.get("store"));
@@ -133,6 +135,14 @@ public class BaseDirTestWatcher extends DirTestWatcher {
   }
 
   /**
+   * Gets the temp directory that should be used to save generated code files.
+   * @return The temp directory that should be used to save generated code 
files.
+   */
+  public File getCodegenDir() {
+    return codegenDir;
+  }
+
+  /**
    * This methods creates a new directory which can be mapped to 
<b>dfs.tmp</b>.
    */
   public void newDfsTestTmpDir() {

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
index 387caa7..c3ecaf1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
@@ -35,6 +35,8 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.exec.compile.ClassBuilder;
+import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.test.DrillTestWrapper.TestServices;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.DrillProperties;
@@ -72,6 +74,7 @@ import com.google.common.io.Resources;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.test.ClusterFixture;
+import org.junit.ClassRule;
 
 public class BaseTestQuery extends ExecTest {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
@@ -152,10 +155,12 @@ public class BaseTestQuery extends ExecTest {
 
   protected static Properties cloneDefaultTestConfigProperties() {
     final Properties props = new Properties();
+
     for(String propName : TEST_CONFIGURATIONS.stringPropertyNames()) {
       props.put(propName, TEST_CONFIGURATIONS.getProperty(propName));
     }
 
+    props.setProperty(ClassBuilder.CODE_DIR_OPTION, 
dirTestWatcher.getCodegenDir().getAbsolutePath());
     props.setProperty(ExecConstants.DRILL_TMP_DIR, 
dirTestWatcher.getTmpDir().getAbsolutePath());
     props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, 
dirTestWatcher.getStoreDir().getAbsolutePath());
 

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 9ddcdb7..6dbdacd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.drill.exec.compile.ClassBuilder;
 import org.apache.drill.test.DrillTestWrapper.TestServices;
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -515,6 +516,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
          .sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, 
MAX_WIDTH_PER_NODE);
     Properties props = new Properties();
     props.putAll(ClusterFixture.TEST_CONFIGURATIONS);
+    props.setProperty(ClassBuilder.CODE_DIR_OPTION, 
dirTestWatcher.getCodegenDir().getAbsolutePath());
     props.setProperty(ExecConstants.DRILL_TMP_DIR, 
dirTestWatcher.getTmpDir().getAbsolutePath());
     props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, 
dirTestWatcher.getStoreDir().getAbsolutePath());
 

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
index 5972f05..b5b1f1f 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.test.rowSet;
 
+import com.google.common.collect.Sets;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
 import org.apache.drill.exec.physical.rowSet.model.SchemaInference;
@@ -34,6 +35,8 @@ import 
org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
 import org.apache.drill.test.rowSet.RowSetWriterImpl.WriterIndexImpl;
 
+import java.util.Set;
+
 /**
  * Implementation of a single row set with no indirection (selection)
  * vector.
@@ -140,9 +143,15 @@ public class DirectRowSet extends AbstractSingleRowSet 
implements ExtendableRowS
 
   @Override
   public SingleRowSet toIndirect() {
-    return new IndirectRowSet(this);
+    return new IndirectRowSet(this, Sets.<Integer>newHashSet());
   }
 
   @Override
+  public SingleRowSet toIndirect(Set<Integer> skipIndices) {
+    return new IndirectRowSet(this, skipIndices);
+  }
+
+
+  @Override
   public SelectionVector2 getSv2() { return null; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index e729bba..cc9895d 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.test.rowSet;
 
+import com.google.common.collect.Sets;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
@@ -26,6 +27,8 @@ import 
org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
+import java.util.Set;
+
 /**
  * Single row set coupled with an indirection (selection) vector,
  * specifically an SV2.
@@ -65,31 +68,41 @@ public class IndirectRowSet extends AbstractSingleRowSet {
     this.sv2 = sv2;
   }
 
+  public IndirectRowSet(VectorContainer container) {
+    this(container, makeSv2(container.getAllocator(), container, 
Sets.<Integer>newHashSet()));
+  }
+
   public static IndirectRowSet fromContainer(VectorContainer container) {
-    return new IndirectRowSet(container, makeSv2(container.getAllocator(), 
container));
+    return new IndirectRowSet(container, makeSv2(container.getAllocator(), 
container, Sets.<Integer>newHashSet()));
   }
 
   public static IndirectRowSet fromSv2(VectorContainer container, 
SelectionVector2 sv2) {
     return new IndirectRowSet(container, sv2);
   }
 
-  private static SelectionVector2 makeSv2(BufferAllocator allocator, 
VectorContainer container) {
-    int rowCount = container.getRecordCount();
+  private static SelectionVector2 makeSv2(BufferAllocator allocator, 
VectorContainer container,
+                                          Set<Integer> skipIndices) {
+    int rowCount = container.getRecordCount() - skipIndices.size();
     SelectionVector2 sv2 = new SelectionVector2(allocator);
     if (!sv2.allocateNewSafe(rowCount)) {
       throw new OutOfMemoryException("Unable to allocate sv2 buffer");
     }
-    for (int i = 0; i < rowCount; i++) {
-      sv2.setIndex(i, (char) i);
+    for (int srcIndex = 0, destIndex = 0; srcIndex < 
container.getRecordCount(); srcIndex++) {
+      if (skipIndices.contains(srcIndex)) {
+        continue;
+      }
+
+      sv2.setIndex(destIndex, (char)srcIndex);
+      destIndex++;
     }
     sv2.setRecordCount(rowCount);
     container.buildSchema(SelectionVectorMode.TWO_BYTE);
     return sv2;
   }
 
-  public IndirectRowSet(DirectRowSet directRowSet) {
+  public IndirectRowSet(DirectRowSet directRowSet, Set<Integer> skipIndices) {
     super(directRowSet);
-    sv2 = makeSv2(allocator(), container());
+    sv2 = makeSv2(allocator(), container(), skipIndices);
   }
 
   @Override
@@ -119,6 +132,11 @@ public class IndirectRowSet extends AbstractSingleRowSet {
   public SingleRowSet toIndirect() { return this; }
 
   @Override
+  public SingleRowSet toIndirect(Set<Integer> skipIndices) {
+    return new IndirectRowSet(DirectRowSet.fromContainer(container()), 
skipIndices);
+  }
+
+  @Override
   public long size() {
     RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2);
     return sizer.actualSize();

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
index f2435de..ec0925e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
@@ -28,6 +28,8 @@ import 
org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.parquet.column.ColumnWriter;
 
+import java.util.Set;
+
 /**
  * A row set is a collection of rows stored as value vectors. Elsewhere in
  * Drill we call this a "record batch", but that term has been overloaded to
@@ -117,6 +119,7 @@ public interface RowSet {
 
   public interface SingleRowSet extends RowSet {
     SingleRowSet toIndirect();
+    SingleRowSet toIndirect(Set<Integer> skipIndices);
     SelectionVector2 getSv2();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
new file mode 100644
index 0000000..02156f6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.util.Iterator;
+
+public class RowSetBatch implements RecordBatch {
+  private final RowSet rowSet;
+
+  public RowSetBatch(final RowSet rowSet) {
+    this.rowSet = Preconditions.checkNotNull(rowSet);
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return rowSet.batchSchema();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return rowSet.container().getRecordCount();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    if (rowSet instanceof IndirectRowSet) {
+      return ((IndirectRowSet)rowSet).getSv2();
+    }
+
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    if (rowSet instanceof RowSet.HyperRowSet) {
+      return ((RowSet.HyperRowSet)rowSet).getSv4();
+    }
+
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void kill(boolean sendUpstream) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    return rowSet.container();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return rowSet.container().getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return rowSet.container().getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public IterOutcome next() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return rowSet.container().iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
index 7b1554c..d6bbaf8 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -17,14 +17,16 @@
  */
 package org.apache.drill.test.rowSet;
 
+import com.google.common.collect.Sets;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.TupleMetadata;
 import org.apache.drill.exec.record.TupleSchema;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
-import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 
+import java.util.Set;
+
 /**
  * Fluent builder to quickly build up an row set (record batch)
  * programmatically. Starting with an {@link 
org.apache.drill.test.OperatorFixture}:
@@ -41,6 +43,7 @@ public final class RowSetBuilder {
   private DirectRowSet rowSet;
   private RowSetWriter writer;
   private boolean withSv2;
+  private Set<Integer> skipIndices = Sets.newHashSet();
 
   public RowSetBuilder(BufferAllocator allocator, BatchSchema schema) {
     this(allocator, TupleSchema.fromFields(schema), 10);
@@ -75,6 +78,17 @@ public final class RowSetBuilder {
     return this;
   }
 
+  public RowSetBuilder addSelection(boolean selected, Object...values) {
+    final int index = writer.rowIndex();
+    writer.setRow(values);
+
+    if (!selected) {
+      skipIndices.add(index);
+    }
+
+    return this;
+  }
+
   /**
    * The {@link #addRow(Object...)} method uses Java variable-length arguments 
to
    * pass a row of values. But, when the row consists of a single array, Java
@@ -106,6 +120,10 @@ public final class RowSetBuilder {
     return addRow(new Object[] { value });
   }
 
+  public RowSetBuilder addSingleCol(boolean selected, Object value) {
+    return addSelection(selected, new Object[] { value });
+  }
+
   /**
    * Build the row set with a selection vector 2. The SV2 is
    * initialized to have a 1:1 index to the rows: SV2 0 points
@@ -122,7 +140,7 @@ public final class RowSetBuilder {
   public SingleRowSet build() {
     SingleRowSet result = writer.done();
     if (withSv2) {
-      return result.toIndirect();
+      return rowSet.toIndirect(skipIndices);
     }
     return result;
   }

Reply via email to