Updated Branches: refs/heads/master c50135256 -> ac8590d2b
DRILL-156 Union (all) pop implementation Fixes to TestSimpleFilter Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/acf47818 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/acf47818 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/acf47818 Branch: refs/heads/master Commit: acf47818eb545d6f4109aec1189017754552adcf Parents: c501352 Author: immars <[email protected]> Authored: Thu Aug 22 15:21:22 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 22 16:08:28 2013 -0700 ---------------------------------------------------------------------- .../exec/physical/base/AbstractMultiple.java | 36 +++++ .../physical/base/AbstractPhysicalVisitor.java | 16 +-- .../exec/physical/base/PhysicalVisitor.java | 12 +- .../drill/exec/physical/config/Union.java | 42 ++++++ .../drill/exec/physical/impl/ImplCreator.java | 8 ++ .../drill/exec/physical/impl/ScreenCreator.java | 1 + .../physical/impl/union/UnionBatchCreator.java | 23 ++++ .../physical/impl/union/UnionRecordBatch.java | 130 +++++++++++++++++++ .../physical/impl/TestSimpleFragmentRun.java | 4 +- .../physical/impl/filter/TestSimpleFilter.java | 2 +- .../physical/impl/union/TestSimpleUnion.java | 68 ++++++++++ .../src/test/resources/filter/test1.json | 8 +- .../src/test/resources/union/test1.json | 57 ++++++++ 13 files changed, 384 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java new file mode 100644 index 0000000..247595b --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java @@ -0,0 +1,36 @@ +package org.apache.drill.exec.physical.base; + +import com.google.common.collect.Iterators; + +import java.util.Iterator; + +/** + * Describes an operator that expects more than one children operators as its input. + */ +public abstract class AbstractMultiple extends AbstractBase{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractMultiple.class); + + protected final PhysicalOperator[] children; + + protected AbstractMultiple(PhysicalOperator[] children) { + this.children = children; + } + + public PhysicalOperator[] getChildren() { + return children; + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return Iterators.forArray(children); + } + + public Size getSize() { + Size size = new Size(0,0); + for(PhysicalOperator child:children){ + size.add(child.getSize()); + } + return size; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 3b58803..3cd0d7c 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -17,16 +17,7 @@ ******************************************************************************/ package org.apache.drill.exec.physical.base; -import org.apache.drill.exec.physical.config.Filter; -import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.config.HashToRandomExchange; -import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RandomReceiver; -import org.apache.drill.exec.physical.config.RangeSender; -import org.apache.drill.exec.physical.config.Screen; -import org.apache.drill.exec.physical.config.SingleSender; -import org.apache.drill.exec.physical.config.Sort; -import org.apache.drill.exec.physical.config.UnionExchange; +import org.apache.drill.exec.physical.config.*; public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class); @@ -37,6 +28,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitUnion(Union union, X value) throws E { + return visitOp(union, value); + } + + @Override public T visitFilter(Filter filter, X value) throws E{ return visitOp(filter, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 8e09e3a..39fc105 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -17,16 +17,7 @@ ******************************************************************************/ package org.apache.drill.exec.physical.base; -import org.apache.drill.exec.physical.config.Filter; -import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.config.HashToRandomExchange; -import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RandomReceiver; -import org.apache.drill.exec.physical.config.RangeSender; -import org.apache.drill.exec.physical.config.Screen; -import org.apache.drill.exec.physical.config.SingleSender; -import org.apache.drill.exec.physical.config.Sort; -import org.apache.drill.exec.physical.config.UnionExchange; +import org.apache.drill.exec.physical.config.*; /** * Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization. @@ -44,6 +35,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitStore(Store store, EXTRA value) throws EXCEP; public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP; + public RETURN visitUnion(Union union, EXTRA value) throws EXCEP; public RETURN visitProject(Project project, EXTRA value) throws EXCEP; public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP; public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java new file mode 100644 index 0000000..73ac9b0 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java @@ -0,0 +1,42 @@ +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.*; + +import java.util.List; + +@JsonTypeName("union") + +public class Union extends AbstractMultiple { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filter.class); + + @JsonCreator + public Union(@JsonProperty("children") PhysicalOperator[] children) { + super(children); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitUnion(this, value); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + return new Union(children.toArray(new PhysicalOperator[children.size()])); + } + + @Override + public OperatorCost getCost() { + OperatorCost cost = new OperatorCost(0,0,0,0); + for (int i = 0; i < children.length; i++) { + PhysicalOperator child = children[i]; + cost.add(child.getCost()); + } + return cost; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 2ce5c28..e69aeae 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -34,11 +34,13 @@ import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.physical.config.Sort; +import org.apache.drill.exec.physical.config.*; import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator; import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator; import org.apache.drill.exec.physical.impl.sort.SortBatchCreator; import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator; +import org.apache.drill.exec.physical.impl.union.UnionBatchCreator; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.json.JSONScanBatchCreator; import org.apache.drill.exec.store.json.JSONSubScan; @@ -65,6 +67,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo private SingleSenderCreator ssc = new SingleSenderCreator(); private ProjectBatchCreator pbc = new ProjectBatchCreator(); private FilterBatchCreator fbc = new FilterBatchCreator(); + private UnionBatchCreator unionbc = new UnionBatchCreator(); private SVRemoverCreator svc = new SVRemoverCreator(); private SortBatchCreator sbc = new SortBatchCreator(); private RootExec root = null; @@ -130,6 +133,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo } @Override + public RecordBatch visitUnion(Union union, FragmentContext context) throws ExecutionSetupException { + return unionbc.getBatch(context, union, getChildren(union, context)); + } + + @Override public RecordBatch visitSingleSender(SingleSender op, FragmentContext context) throws ExecutionSetupException { root = ssc.getRoot(context, op, getChildren(op, context)); return null; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 9b31407..b66921d 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -108,6 +108,7 @@ public class ScreenCreator implements RootCreator<Screen>{ QueryWritableBatch batch = materializer.convertNext(true); connection.sendResult(listener, batch); } + return false; } case OK_NEW_SCHEMA: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java new file mode 100644 index 0000000..1945139 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java @@ -0,0 +1,23 @@ +package org.apache.drill.exec.physical.impl.union; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Union; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import com.google.common.base.Preconditions; + +public class UnionBatchCreator implements BatchCreator<Union>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionBatchCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, Union config, List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.size() >= 1); + return new UnionRecordBatch(config, children, context); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java new file mode 100644 index 0000000..b77d4f4 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java @@ -0,0 +1,130 @@ +package org.apache.drill.exec.physical.impl.union; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Union; +import org.apache.drill.exec.record.*; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class UnionRecordBatch extends AbstractRecordBatch<Union> { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionRecordBatch.class); + + private final List<RecordBatch> incoming; + private SelectionVector2 sv; + private Iterator<RecordBatch> incomingIterator = null; + private RecordBatch current = null; + private ArrayList<TransferPair> transfers; + private int outRecordCount; + + public UnionRecordBatch(Union config, List<RecordBatch> children, FragmentContext context) { + super(config, context); + this.incoming = children; + this.incomingIterator = incoming.iterator(); + current = incomingIterator.next(); + sv = null; + } + + + @Override + public int getRecordCount() { + return outRecordCount; + } + + @Override + public void kill() { + if(current != null){ + current.kill(); + current = null; + } + for(;incomingIterator.hasNext();){ + incomingIterator.next().kill(); + } + } + + @Override + protected void killIncoming() { + for (int i = 0; i < incoming.size(); i++) { + RecordBatch in = incoming.get(i); + in.kill(); + } + } + + + @Override + public SelectionVector2 getSelectionVector2() { + return sv; + } + + @Override + public IterOutcome next() { + if (current == null) { // end of iteration + return IterOutcome.NONE; + } + IterOutcome upstream = current.next(); + logger.debug("Upstream... {}", upstream); + while (upstream == IterOutcome.NONE) { + if (!incomingIterator.hasNext()) { + current = null; + return IterOutcome.NONE; + } + current = incomingIterator.next(); + upstream = current.next(); + } + switch (upstream) { + case NONE: + throw new IllegalArgumentException("not possible!"); + case NOT_YET: + case STOP: + return upstream; + case OK_NEW_SCHEMA: + setupSchema(); + // fall through. + case OK: + doTransfer(); + return upstream; // change if upstream changed, otherwise normal. + default: + throw new UnsupportedOperationException(); + } + } + + private void doTransfer() { + outRecordCount = current.getRecordCount(); + if (container.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { + this.sv = current.getSelectionVector2(); + } + for (TransferPair transfer : transfers) { + transfer.transfer(); + } + + for (VectorWrapper<?> vw : this.container) { + ValueVector.Mutator m = vw.getValueVector().getMutator(); + m.setValueCount(outRecordCount); + } + + } + + private void setupSchema() { + if (container != null) { + container.clear(); + } + transfers = Lists.newArrayList(); + + for (VectorWrapper<?> vw : current) { + TransferPair pair = vw.getValueVector().getTransferPair(); + container.add(pair.getTo()); + transfers.add(pair); + } + container.buildSchema(current.getSchema().getSelectionVectorMode()); + } + + @Override + public WritableBatch getWritableBatch() { + return WritableBatch.get(this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java index 6587237..a2612d5 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java @@ -59,7 +59,9 @@ public class TestSimpleFragmentRun extends PopUnitTestBase { // run query. bit.run(); client.connect(); - List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_test2.json"), Charsets.UTF_8)); + String path = "/physical_test2.json"; +// String path = "/filter/test1.json"; + List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile(path), Charsets.UTF_8)); // look at records RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java index afb0b3a..26c8a25 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java @@ -48,7 +48,7 @@ public class TestSimpleFilter { FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry); SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); while(exec.next()){ - assertEquals(50, exec.getSelectionVector2().getCount()); + assertEquals(50, exec.getRecordCount()); } if(context.getFailureCause() != null){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java new file mode 100644 index 0000000..1dbefb1 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java @@ -0,0 +1,68 @@ +package org.apache.drill.exec.physical.impl.union; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import com.yammer.metrics.MetricRegistry; +import mockit.Injectable; +import mockit.NonStrictExpectations; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.impl.ImplCreator; +import org.apache.drill.exec.physical.impl.SimpleRootExec; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.rpc.user.UserServer; +import org.apache.drill.exec.server.DrillbitContext; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSimpleUnion { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleUnion.class); + DrillConfig c = DrillConfig.create(); + + + @Test + public void testUnion(@Injectable final DrillbitContext bitContext, @Injectable UserServer.UserClientConnection connection) throws Throwable{ + + + new NonStrictExpectations(){{ + bitContext.getMetrics(); result = new MetricRegistry("test"); + bitContext.getAllocator(); result = BufferAllocator.getAllocator(c); + }}; + + + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/union/test1.json"), Charsets.UTF_8)); + FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); + FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry); + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); + + int[] counts = new int[]{100,50}; + int i=0; + while(exec.next()){ + System.out.println("iteration count:" + exec.getRecordCount()); + assertEquals(counts[i++], exec.getRecordCount()); + } + + if(context.getFailureCause() != null){ + throw context.getFailureCause(); + } + assertTrue(!context.isFailed()); + + } + + @After + public void tearDown() throws Exception{ + // pause to get logger to catch up. + Thread.sleep(1000); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json index 4f82145..7d05928 100644 --- a/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json +++ b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json @@ -26,8 +26,14 @@ expr: "alternate()" }, { + @id:4, + child:2, + pop: "selection-vector-remover" + + }, + { @id: 3, - child: 2, + child: 4, pop: "screen" } ] http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/acf47818/sandbox/prototype/exec/java-exec/src/test/resources/union/test1.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/union/test1.json b/sandbox/prototype/exec/java-exec/src/test/resources/union/test1.json new file mode 100644 index 0000000..5b33232 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/union/test1.json @@ -0,0 +1,57 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://apache.org", + entries:[ + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + pop:"mock-sub-scan", + url: "http://apache.org", + entries:[ + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:3, + child: 2, + pop:"filter", + expr: "alternate()" + }, + { + @id:4, + children: [1,3], + pop:"union" + }, + { + @id:5, + child:4, + pop: "selection-vector-remover" + + }, + { + @id: 6, + child: 5, + pop: "screen" + } + ] +} \ No newline at end of file
