implement SV4 support for filter

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4481dadc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4481dadc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4481dadc

Branch: refs/heads/master
Commit: 4481dadcec3d96638274469e695053c4d7c95805
Parents: a73512d
Author: Ben Becker <[email protected]>
Authored: Fri Aug 30 09:13:14 2013 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed Oct 30 17:21:53 2013 -0700

----------------------------------------------------------------------
 .../physical/impl/filter/FilterRecordBatch.java | 118 +++++++++++++++----
 .../physical/impl/filter/FilterTemplate.java    | 103 ----------------
 .../physical/impl/filter/FilterTemplate2.java   |  88 ++++++++++++++
 .../physical/impl/filter/FilterTemplate4.java   |  52 ++++++++
 .../exec/physical/impl/filter/Filterer.java     |   3 +-
 .../exec/record/selection/SelectionVector4.java |  11 +-
 .../exec/physical/impl/SimpleRootExec.java      |   5 +
 .../physical/impl/filter/TestSimpleFilter.java  |  30 ++++-
 .../src/test/resources/filter/test_sv4.json     |  42 +++++++
 9 files changed, 323 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 5f9a06a..e67e531 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.filter;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -27,27 +28,29 @@ import 
org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.AbstractSingleRecordBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
 
-  private final SelectionVector2 sv;
+  private SelectionVector2 sv2;
+  private SelectionVector4 sv4;
+  private BufferAllocator.PreAllocator svAllocator;
   private Filterer filter;
-  
-  public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext 
context){
+
+  public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext 
context) {
     super(pop, context, incoming);
-    sv = new SelectionVector2(context.getAllocator());
   }
   
   @Override
@@ -57,18 +60,22 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
 
   @Override
   public int getRecordCount() {
-    return sv.getCount();
+    return sv2 != null ? sv2.getCount() : sv4.getCount();
   }
 
   @Override
   public SelectionVector2 getSelectionVector2() {
-    return sv;
+    return sv2;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    return sv4;
   }
 
   @Override
   protected void doWork() {
     int recordCount = incoming.getRecordCount();
-    sv.allocateNew(recordCount);
     filter.filterBatch(recordCount);
     for(VectorWrapper<?> v : container){
       ValueVector.Mutator m = v.getValueVector().getMutator();
@@ -79,33 +86,102 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
   @Override
   protected void setupNewSchema() throws SchemaChangeException {
     container.clear();
-    LogicalExpression filterExpression = popConfig.getExpr();
+
+    switch(incoming.getSchema().getSelectionVectorMode()){
+      case NONE:
+        sv2 = new SelectionVector2(context.getAllocator());
+        this.filter = generateSV2Filterer();
+        break;
+      case TWO_BYTE:
+        sv2 = new SelectionVector2(context.getAllocator());
+        this.filter = generateSV2Filterer();
+        break;
+      case FOUR_BYTE:
+        // set up the multi-batch selection vector
+        this.svAllocator = context.getAllocator().getPreAllocator();
+        if (!svAllocator.preAllocate(incoming.getRecordCount()*4))
+          throw new SchemaChangeException("Attempted to filter an SV4 which 
exceeds allowed memory (" +
+                                          incoming.getRecordCount() * 4 + " 
bytes)");
+        sv4 = new SelectionVector4(svAllocator.getAllocation(), 
incoming.getRecordCount(), Character.MAX_VALUE);
+        this.filter = generateSV4Filterer();
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+
+  }
+
+  protected Filterer generateSV4Filterer() throws SchemaChangeException {
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
-    final CodeGenerator<Filterer> cg = new 
CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
-    
-    final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(filterExpression, incoming, collector);
+    final List<VectorAllocator> allocators = Lists.newArrayList();
+    final CodeGenerator<Filterer> cg = new 
CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION4, 
context.getFunctionRegistry());
+
+    final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, 
collector);
     if(collector.hasErrors()){
       throw new SchemaChangeException(String.format("Failure while trying to 
materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
     }
-    
+
     cg.addExpr(new ReturnValueExpression(expr));
-    
+
+//    for(VectorWrapper<?> i : incoming){
+//      ValueVector v = TypeHelper.getNewVector(i.getField(), 
context.getAllocator());
+//      container.add(v);
+//      allocators.add(getAllocator4(v));
+//    }
+
+    for (VectorWrapper<?> vw : incoming) {
+      for (ValueVector vv : vw.getValueVectors()) {
+        TransferPair pair = vv.getTransferPair();
+        container.add(pair.getTo());
+        transfers.add(pair);
+      }
+    }
+
+    // allocate outgoing sv4
+    container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+
+    try {
+      TransferPair[] tx = transfers.toArray(new 
TransferPair[transfers.size()]);
+      Filterer filter = context.getImplementationClass(cg);
+      filter.setup(context, incoming, this, tx);
+      return filter;
+    } catch (ClassTransformationException | IOException e) {
+      throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
+    }
+
+  }
+
+  protected Filterer generateSV2Filterer() throws SchemaChangeException {
+    final ErrorCollector collector = new ErrorCollectorImpl();
+    final List<TransferPair> transfers = Lists.newArrayList();
+    final CodeGenerator<Filterer> cg = new 
CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION2, 
context.getFunctionRegistry());
+
+    final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, 
collector);
+    if(collector.hasErrors()){
+      throw new SchemaChangeException(String.format("Failure while trying to 
materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+    }
+
+    cg.addExpr(new ReturnValueExpression(expr));
+
     for(VectorWrapper<?> v : incoming){
       TransferPair pair = v.getValueVector().getTransferPair();
       container.add(pair.getTo());
       transfers.add(pair);
     }
-    
+
     container.buildSchema(SelectionVectorMode.TWO_BYTE);
-    
+
     try {
       TransferPair[] tx = transfers.toArray(new 
TransferPair[transfers.size()]);
-      this.filter = context.getImplementationClass(cg);
+      Filterer filter = context.getImplementationClass(cg);
       filter.setup(context, incoming, this, tx);
+      return filter;
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
     }
+
   }
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
deleted file mode 100644
index a03d48f..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.filter;
-
-import javax.inject.Named;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
-public abstract class FilterTemplate implements Filterer{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FilterTemplate.class);
-  
-  private SelectionVector2 outgoingSelectionVector;
-  private SelectionVector2 incomingSelectionVector;
-  private SelectionVectorMode svMode;
-  private TransferPair[] transfers;
-  
-  @Override
-  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, TransferPair[] transfers) throws SchemaChangeException{
-    this.transfers = transfers;
-    this.outgoingSelectionVector = outgoing.getSelectionVector2();
-    this.svMode = incoming.getSchema().getSelectionVectorMode();
-    
-    switch(svMode){
-    case NONE:
-      break;
-    case TWO_BYTE:
-      this.incomingSelectionVector = incoming.getSelectionVector2();
-      break;
-    default:
-      throw new UnsupportedOperationException();
-    }
-    doSetup(context, incoming, outgoing);
-  }
-
-  private void doTransfers(){
-    for(TransferPair t : transfers){
-      t.transfer();
-    }
-  }
-  
-  public void filterBatch(int recordCount){
-    switch(svMode){
-    case NONE:
-      filterBatchNoSV(recordCount);
-      break;
-    case TWO_BYTE:
-      filterBatchSV2(recordCount);
-      break;
-    default:
-      throw new UnsupportedOperationException();
-    }
-    doTransfers();
-  }
-  
-  private void filterBatchSV2(int recordCount){
-    int svIndex = 0;
-    final int count = recordCount;
-    for(int i = 0; i < count; i++){
-      char index = incomingSelectionVector.getIndex(i);
-      if(doEval(i, 0)){
-        outgoingSelectionVector.setIndex(svIndex, index);
-        svIndex++;
-      }
-    }
-    outgoingSelectionVector.setRecordCount(svIndex);
-  }
-  
-  private void filterBatchNoSV(int recordCount){
-    int svIndex = 0;
-    for(char i =0; i < recordCount; i++){
-      
-      if(doEval(i, 0)){
-        outgoingSelectionVector.setIndex(svIndex, i);
-        svIndex++;
-      }
-    }
-    outgoingSelectionVector.setRecordCount(svIndex);
-  }
-  
-  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch 
outgoing);
-  public abstract boolean doEval(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
new file mode 100644
index 0000000..587440c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -0,0 +1,88 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+public abstract class FilterTemplate2 implements Filterer{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class);
+  
+  private SelectionVector2 outgoingSelectionVector;
+  private SelectionVector2 incomingSelectionVector;
+  private SelectionVectorMode svMode;
+  private TransferPair[] transfers;
+
+  @Override
+  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, TransferPair[] transfers) throws SchemaChangeException{
+    this.transfers = transfers;
+    this.outgoingSelectionVector = outgoing.getSelectionVector2();
+    this.svMode = incoming.getSchema().getSelectionVectorMode();
+    
+    switch(svMode){
+    case NONE:
+      break;
+    case TWO_BYTE:
+      this.incomingSelectionVector = incoming.getSelectionVector2();
+      break;
+    default:
+      // SV4 is handled in FilterTemplate4
+      throw new UnsupportedOperationException();
+    }
+    doSetup(context, incoming, outgoing);
+  }
+
+  private void doTransfers(){
+    for(TransferPair t : transfers){
+      t.transfer();
+    }
+  }
+  
+  public void filterBatch(int recordCount){
+    outgoingSelectionVector.allocateNew(recordCount);
+    switch(svMode){
+    case NONE:
+      filterBatchNoSV(recordCount);
+      break;
+    case TWO_BYTE:
+      filterBatchSV2(recordCount);
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+    doTransfers();
+  }
+  
+  private void filterBatchSV2(int recordCount){
+    int svIndex = 0;
+    final int count = recordCount;
+    for(int i = 0; i < count; i++){
+      char index = incomingSelectionVector.getIndex(i);
+      if(doEval(i, 0)){
+        outgoingSelectionVector.setIndex(svIndex, index);
+        svIndex++;
+      }
+    }
+    outgoingSelectionVector.setRecordCount(svIndex);
+  }
+
+  private void filterBatchNoSV(int recordCount){
+    int svIndex = 0;
+    for(char i =0; i < recordCount; i++){
+      
+      if(doEval(i, 0)){
+        outgoingSelectionVector.setIndex(svIndex, i);
+        svIndex++;
+      }
+    }
+    outgoingSelectionVector.setRecordCount(svIndex);
+  }
+  
+  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch 
outgoing);
+  public abstract boolean doEval(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
new file mode 100644
index 0000000..b394387
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
@@ -0,0 +1,52 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+import javax.inject.Named;
+
+public abstract class FilterTemplate4 implements Filterer {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class);
+
+  private SelectionVector4 outgoingSelectionVector;
+  private SelectionVector4 incomingSelectionVector;
+  private TransferPair[] transfers;
+
+  @Override
+  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, TransferPair[] transfers)
+      throws SchemaChangeException {
+    this.transfers = transfers;
+    this.outgoingSelectionVector = outgoing.getSelectionVector4();
+    this.incomingSelectionVector = incoming.getSelectionVector4();
+    doSetup(context, incoming, outgoing);
+  }
+
+  @Override
+  public void filterBatch(int recordCount){
+    int outPos = 0;
+    for (int i = 0; i < incomingSelectionVector.getCount(); i++) {
+      int index = incomingSelectionVector.get(i);
+      if (doEval(index, 0)) {
+        System.out.println(" (match): " + index + " (i: " + i + ") ");
+        outgoingSelectionVector.set(outPos++, index);
+      }
+    }
+    outgoingSelectionVector.setCount(outPos);
+    doTransfers();
+  }
+
+  private void doTransfers(){
+    for(TransferPair t : transfers){
+      t.transfer();
+    }
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch 
outgoing);
+  public abstract boolean doEval(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
index b821720..8e8cb2e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -29,6 +29,7 @@ public interface Filterer {
   public void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, TransferPair[] transfers) throws SchemaChangeException;
   public void filterBatch(int recordCount);
   
-  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate.class);
+  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new 
TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
+  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new 
TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 606103b..4533cc2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -25,7 +25,7 @@ public class SelectionVector4 {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
 
   private final ByteBuf vector;
-  private final int recordCount;
+  private int recordCount;
   private int start;
   private int length;
   
@@ -44,7 +44,12 @@ public class SelectionVector4 {
   public int getCount(){
     return length;
   }
-  
+
+  public void setCount(int length) {
+    this.length = length;
+    this.recordCount = length;
+  }
+
   public void set(int index, int compound){
     vector.setInt(index*4, compound);
   }
@@ -55,7 +60,7 @@ public class SelectionVector4 {
   public int get(int index){
     return vector.getInt( (start+index)*4);
   }
-  
+
   /**
    * Caution: This method shares the underlying buffer between this vector and 
the newly created one.
    * @return Newly created single batch SelectionVector4.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index c4533b1..0312863 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.beust.jcommander.internal.Lists;
@@ -54,6 +55,10 @@ public class SimpleRootExec implements RootExec, 
Iterable<ValueVector>{
     return incoming.getSelectionVector2();
   }
 
+  public SelectionVector4 getSelectionVector4(){
+    return incoming.getSelectionVector4();
+  }
+
   @SuppressWarnings("unchecked")
   public <T extends ValueVector> T getValueVectorById(SchemaPath path, 
Class<?> vvClass){
     TypedFieldId tfid = incoming.getValueVectorId(path);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index 6841662..e81774a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -74,7 +74,35 @@ public class TestSimpleFilter {
     assertTrue(!context.isFailed());
 
   }
-  
+
+  @Test
+  public void testSV4Filter(@Injectable final DrillbitContext bitContext, 
@Injectable UserClientConnection connection) throws Throwable{
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = 
reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test_sv4.json"),
 Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new 
FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, 
FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, 
(FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    int recordCount = 0;
+    while(exec.next()) {
+      for (int i = 0; i < exec.getSelectionVector4().getCount(); i++) {
+        System.out.println("Got: " + exec.getSelectionVector4().get(i));
+      }
+      recordCount += exec.getSelectionVector4().getCount();
+    }
+    assertEquals(50, recordCount);
+
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+
+  }
+
   @AfterClass
   public static void tearDown() throws Exception{
     // pause to get logger to catch up.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4481dadc/exec/java-exec/src/test/resources/filter/test_sv4.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/filter/test_sv4.json 
b/exec/java-exec/src/test/resources/filter/test_sv4.json
new file mode 100644
index 0000000..685e315
--- /dev/null
+++ b/exec/java-exec/src/test/resources/filter/test_sv4.json
@@ -0,0 +1,42 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+       graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org";,
+            entries:[
+               {records: 100, types: [
+                 {name: "blue", type: "INT", mode: "REQUIRED"},
+                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
+                 {name: "green", type: "INT", mode: "REQUIRED"}
+               ]}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"sort",
+            orderings: [
+              {expr: "blue"}
+            ]
+        },
+        {
+            @id:3,
+            child: 2,
+            pop:"filter",
+            expr: "alternate()"
+        },
+        {
+            @id: 4,
+            child: 3,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

Reply via email to