Updated Branches:
  refs/heads/master c50135256 -> ac8590d2b

DRILL-156 Union (all) pop implementation
Fixes to TestSimpleFilter


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/acf47818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/acf47818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/acf47818

Branch: refs/heads/master
Commit: acf47818eb545d6f4109aec1189017754552adcf
Parents: c501352
Author: immars <[email protected]>
Authored: Thu Aug 22 15:21:22 2013 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Thu Aug 22 16:08:28 2013 -0700

----------------------------------------------------------------------
 .../exec/physical/base/AbstractMultiple.java    |  36 +++++
 .../physical/base/AbstractPhysicalVisitor.java  |  16 +--
 .../exec/physical/base/PhysicalVisitor.java     |  12 +-
 .../drill/exec/physical/config/Union.java       |  42 ++++++
 .../drill/exec/physical/impl/ImplCreator.java   |   8 ++
 .../drill/exec/physical/impl/ScreenCreator.java |   1 +
 .../physical/impl/union/UnionBatchCreator.java  |  23 ++++
 .../physical/impl/union/UnionRecordBatch.java   | 130 +++++++++++++++++++
 .../physical/impl/TestSimpleFragmentRun.java    |   4 +-
 .../physical/impl/filter/TestSimpleFilter.java  |   2 +-
 .../physical/impl/union/TestSimpleUnion.java    |  68 ++++++++++
 .../src/test/resources/filter/test1.json        |   8 +-
 .../src/test/resources/union/test1.json         |  57 ++++++++
 13 files changed, 384 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java
new file mode 100644
index 0000000..247595b
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java
@@ -0,0 +1,36 @@
+package org.apache.drill.exec.physical.base;
+
+import com.google.common.collect.Iterators;
+
+import java.util.Iterator;
+
+/**
+ * Describes an operator that expects more than one children operators as its 
input.
+ */
+public abstract class AbstractMultiple extends AbstractBase{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractMultiple.class);  
+  
+  protected final PhysicalOperator[] children;
+
+  protected AbstractMultiple(PhysicalOperator[] children) {
+    this.children = children;
+  }
+
+  public PhysicalOperator[] getChildren() {
+    return children;
+  }
+  
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.forArray(children);
+  }
+  
+  public Size getSize() {
+    Size size = new Size(0,0);
+    for(PhysicalOperator child:children){
+      size.add(child.getSize());
+    }
+    return size;
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 3b58803..3cd0d7c 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -17,16 +17,7 @@
  
******************************************************************************/
 package org.apache.drill.exec.physical.base;
 
-import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.HashToRandomExchange;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.physical.config.RangeSender;
-import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.config.UnionExchange;
+import org.apache.drill.exec.physical.config.*;
 
 public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> 
implements PhysicalVisitor<T, X, E> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
@@ -37,6 +28,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitUnion(Union union, X value) throws E {
+    return visitOp(union, value);
+  }
+
+  @Override
   public T visitFilter(Filter filter, X value) throws E{
     return visitOp(filter, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 8e09e3a..39fc105 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -17,16 +17,7 @@
  
******************************************************************************/
 package org.apache.drill.exec.physical.base;
 
-import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.HashToRandomExchange;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.physical.config.RangeSender;
-import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.config.UnionExchange;
+import org.apache.drill.exec.physical.config.*;
 
 /**
  * Visitor class designed to traversal of a operator tree.  Basis for a number 
of operator manipulations including fragmentation and materialization.
@@ -44,6 +35,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
 
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
+  public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
   public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
new file mode 100644
index 0000000..73ac9b0
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
@@ -0,0 +1,42 @@
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.*;
+
+import java.util.List;
+
+@JsonTypeName("union")
+
+public class Union extends AbstractMultiple {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Filter.class);
+
+  @JsonCreator
+  public Union(@JsonProperty("children") PhysicalOperator[] children) {
+    super(children);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitUnion(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new Union(children.toArray(new PhysicalOperator[children.size()]));
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    OperatorCost cost = new OperatorCost(0,0,0,0);
+    for (int i = 0; i < children.length; i++) {
+      PhysicalOperator child = children[i];
+      cost.add(child.getCost());
+    }
+    return cost;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 2ce5c28..e69aeae 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -34,11 +34,13 @@ import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.*;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
 import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
+import org.apache.drill.exec.physical.impl.union.UnionBatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.json.JSONScanBatchCreator;
 import org.apache.drill.exec.store.json.JSONSubScan;
@@ -65,6 +67,7 @@ public class ImplCreator extends 
AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private SingleSenderCreator ssc = new SingleSenderCreator();
   private ProjectBatchCreator pbc = new ProjectBatchCreator();
   private FilterBatchCreator fbc = new FilterBatchCreator();
+  private UnionBatchCreator unionbc = new UnionBatchCreator();
   private SVRemoverCreator svc = new SVRemoverCreator();
   private SortBatchCreator sbc = new SortBatchCreator();
   private RootExec root = null;
@@ -130,6 +133,11 @@ public class ImplCreator extends 
AbstractPhysicalVisitor<RecordBatch, FragmentCo
   }
 
   @Override
+  public RecordBatch visitUnion(Union union, FragmentContext context) throws 
ExecutionSetupException {
+    return unionbc.getBatch(context, union, getChildren(union, context));
+  }
+
+  @Override
   public RecordBatch visitSingleSender(SingleSender op, FragmentContext 
context) throws ExecutionSetupException {
     root = ssc.getRoot(context, op, getChildren(op, context));
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 9b31407..b66921d 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -108,6 +108,7 @@ public class ScreenCreator implements RootCreator<Screen>{
           QueryWritableBatch batch = materializer.convertNext(true);
           connection.sendResult(listener, batch);
         }
+
         return false;
       }
       case OK_NEW_SCHEMA:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java
new file mode 100644
index 0000000..1945139
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java
@@ -0,0 +1,23 @@
+package org.apache.drill.exec.physical.impl.union;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Union;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class UnionBatchCreator implements BatchCreator<Union>{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, Union config, 
List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() >= 1);
+    return new UnionRecordBatch(config, children, context);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
new file mode 100644
index 0000000..b77d4f4
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.physical.impl.union;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Union;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class UnionRecordBatch extends AbstractRecordBatch<Union> {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionRecordBatch.class);
+
+  private final List<RecordBatch> incoming;
+  private SelectionVector2 sv;
+  private Iterator<RecordBatch> incomingIterator = null;
+  private RecordBatch current = null;
+  private ArrayList<TransferPair> transfers;
+  private int outRecordCount;
+
+  public UnionRecordBatch(Union config, List<RecordBatch> children, 
FragmentContext context) {
+    super(config, context);
+    this.incoming = children;
+    this.incomingIterator = incoming.iterator();
+    current = incomingIterator.next();
+    sv = null;
+  }
+
+
+  @Override
+  public int getRecordCount() {
+    return outRecordCount;
+  }
+
+  @Override
+  public void kill() {
+    if(current != null){
+      current.kill();
+      current = null;
+    }
+    for(;incomingIterator.hasNext();){
+      incomingIterator.next().kill();
+    }
+  }
+
+  @Override
+  protected void killIncoming() {
+    for (int i = 0; i < incoming.size(); i++) {
+      RecordBatch in = incoming.get(i);
+      in.kill();
+    }
+  }
+
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return sv;
+  }
+
+  @Override
+  public IterOutcome next() {
+    if (current == null) { // end of iteration
+      return IterOutcome.NONE;
+    }
+    IterOutcome upstream = current.next();
+    logger.debug("Upstream... {}", upstream);
+    while (upstream == IterOutcome.NONE) {
+      if (!incomingIterator.hasNext()) {
+        current = null;
+        return IterOutcome.NONE;
+      }
+      current = incomingIterator.next();
+      upstream = current.next();
+    }
+    switch (upstream) {
+      case NONE:
+        throw new IllegalArgumentException("not possible!");
+      case NOT_YET:
+      case STOP:
+        return upstream;
+      case OK_NEW_SCHEMA:
+        setupSchema();
+        // fall through.
+      case OK:
+        doTransfer();
+        return upstream; // change if upstream changed, otherwise normal.
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
+
+  private void doTransfer() {
+    outRecordCount = current.getRecordCount();
+    if (container.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
+      this.sv = current.getSelectionVector2();
+    }
+    for (TransferPair transfer : transfers) {
+      transfer.transfer();
+    }
+
+    for (VectorWrapper<?> vw : this.container) {
+      ValueVector.Mutator m = vw.getValueVector().getMutator();
+      m.setValueCount(outRecordCount);
+    }
+
+  }
+
+  private void setupSchema() {
+    if (container != null) {
+      container.clear();
+    }
+    transfers = Lists.newArrayList();
+
+    for (VectorWrapper<?> vw : current) {
+      TransferPair pair = vw.getValueVector().getTransferPair();
+      container.add(pair.getTo());
+      transfers.add(pair);
+    }
+    container.buildSchema(current.getSchema().getSelectionVectorMode());
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index 6587237..a2612d5 100644
--- 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -59,7 +59,9 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
     // run query.
     bit.run();
     client.connect();
-    List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, 
Files.toString(FileUtils.getResourceAsFile("/physical_test2.json"), 
Charsets.UTF_8));
+      String path = "/physical_test2.json";
+//      String path = "/filter/test1.json";
+    List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, 
Files.toString(FileUtils.getResourceAsFile(path), Charsets.UTF_8));
 
     // look at records
     RecordBatchLoader batchLoader = new 
RecordBatchLoader(client.getAllocator());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index afb0b3a..26c8a25 100644
--- 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -48,7 +48,7 @@ public class TestSimpleFilter {
     FragmentContext context = new FragmentContext(bitContext, 
FragmentHandle.getDefaultInstance(), connection, null, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, 
(FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     while(exec.next()){
-      assertEquals(50, exec.getSelectionVector2().getCount());
+      assertEquals(50, exec.getRecordCount());
     }
     
     if(context.getFailureCause() != null){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
new file mode 100644
index 0000000..1dbefb1
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
@@ -0,0 +1,68 @@
+package org.apache.drill.exec.physical.impl.union;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSimpleUnion {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestSimpleUnion.class);
+  DrillConfig c = DrillConfig.create();
+  
+  
+  @Test
+  public void testUnion(@Injectable final DrillbitContext bitContext, 
@Injectable UserServer.UserClientConnection connection) throws Throwable{
+
+    
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+    
+    
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = 
reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/union/test1.json"),
 Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new 
FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, 
ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, 
(FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    
+    int[] counts = new int[]{100,50};
+    int i=0;
+    while(exec.next()){
+      System.out.println("iteration count:" + exec.getRecordCount());
+      assertEquals(counts[i++], exec.getRecordCount());
+    }
+    
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+    
+  }
+  
+  @After
+  public void tearDown() throws Exception{
+    // pause to get logger to catch up.
+    Thread.sleep(1000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json 
b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
index 4f82145..7d05928 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
@@ -26,8 +26,14 @@
             expr: "alternate()"
         },
         {
+          @id:4,
+          child:2,
+          pop: "selection-vector-remover"
+          
+        },
+        {
             @id: 3,
-            child: 2,
+            child: 4,
             pop: "screen"
         }
     ]

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/resources/union/test1.json
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/resources/union/test1.json 
b/sandbox/prototype/exec/java-exec/src/test/resources/union/test1.json
new file mode 100644
index 0000000..5b33232
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/union/test1.json
@@ -0,0 +1,57 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+       graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org";,
+            entries:[
+               {records: 100, types: [
+                 {name: "blue", type: "INT", mode: "REQUIRED"},
+                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
+                 {name: "green", type: "INT", mode: "REQUIRED"}
+               ]}
+            ]
+        },
+        {
+            @id:2,
+            pop:"mock-sub-scan",
+            url: "http://apache.org";,
+            entries:[
+               {records: 100, types: [
+                 {name: "blue", type: "INT", mode: "REQUIRED"},
+                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
+                 {name: "green", type: "INT", mode: "REQUIRED"}
+               ]}
+            ]
+        },        
+        {
+            @id:3,
+            child: 2,
+            pop:"filter",
+            expr: "alternate()"
+        },
+        {
+            @id:4,
+            children: [1,3],
+            pop:"union"
+        },
+        {
+          @id:5,
+          child:4,
+          pop: "selection-vector-remover"
+          
+        },
+        {
+            @id: 6,
+            child: 5,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

Reply via email to