DRILL-707 : Replace ValueAllocator with allocateNewSafe() in SVR. WIP. remove valueallocator in SVR.
SV for Limit OP. Selection vector remover. More WIP. code clean up. reverse rule change. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c7746ed5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c7746ed5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c7746ed5 Branch: refs/heads/master Commit: c7746ed518adcacd844c9c5e35dcefa52949a773 Parents: 850f8c6 Author: Jinfeng Ni <j...@maprtech.com> Authored: Tue May 13 17:20:56 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Thu May 15 09:20:11 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/config/Limit.java | 8 +++++ .../impl/svremover/CopierTemplate2.java | 30 +++++++++---------- .../impl/svremover/CopierTemplate4.java | 26 ++++++++-------- .../impl/svremover/RemovingRecordBatch.java | 31 ++++++++++++-------- .../org/apache/drill/TestExampleQueries.java | 23 ++++++++++++++- .../exec/store/json/JsonRecordReader2Test.java | 2 +- 6 files changed, 76 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java index b926e3e..7d1d485 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java @@ -20,11 +20,13 @@ package org.apache.drill.exec.physical.config; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; + import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @JsonTypeName("limit") public class Limit extends AbstractSingle { @@ -60,4 +62,10 @@ public class Limit extends AbstractSingle { public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { return physicalVisitor.visitLimit(this, value); } + + @Override + public SelectionVectorMode getSVMode() { + return SelectionVectorMode.TWO_BYTE; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java index 2f589a5..387497c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java @@ -20,38 +20,38 @@ package org.apache.drill.exec.physical.impl.svremover; import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; + public abstract class CopierTemplate2 implements Copier{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class); - + private SelectionVector2 sv2; - private VectorAllocator[] allocators; private RecordBatch incoming; - + private RecordBatch outgoing; + @Override public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{ - this.allocators = allocators; this.sv2 = incoming.getSelectionVector2(); this.incoming = incoming; + this.outgoing = outgoing; doSetup(context, incoming, outgoing); } - - private void allocateVectors(int recordCount){ - for(VectorAllocator a : allocators){ - a.alloc(recordCount); - } - } - + @Override public int copyRecords(int index, int recordCount){ - allocateVectors(recordCount); + for(VectorWrapper<?> out : outgoing){ + out.getValueVector().allocateNewSafe(); + } + int outgoingPosition = 0; - + for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){ if (!doEval(sv2.getIndex(svIndex), outgoingPosition)) { break; @@ -59,10 +59,10 @@ public abstract class CopierTemplate2 implements Copier{ } return outgoingPosition; } - + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java index a7aba6e..b48a8fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java @@ -29,30 +29,28 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator; public abstract class CopierTemplate4 implements Copier{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class); - + private SelectionVector4 sv4; - private VectorAllocator[] allocators; private RecordBatch incoming; - - private void allocateVectors(int recordCount){ - for(VectorAllocator a : allocators){ - a.alloc(recordCount); - } - } - + private RecordBatch outgoing; + + @Override public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{ - this.allocators = allocators; this.incoming = incoming; + this.outgoing = outgoing; this.sv4 = incoming.getSelectionVector4(); doSetup(context, incoming, outgoing); } - + @Override public int copyRecords(int index, int recordCount){ // logger.debug("Copying records."); - allocateVectors(recordCount); + for(VectorWrapper<?> out : outgoing){ + out.getValueVector().allocateNewSafe(); + } + int outgoingPosition = 0; for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){ int deRefIndex = sv4.get(svIndex); @@ -62,10 +60,10 @@ public abstract class CopierTemplate4 implements Copier{ } return outgoingPosition; } - + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 62af0b2..2918fd2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -30,13 +30,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.record.*; -import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TransferPair; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; @@ -126,11 +120,18 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } } } + + logger.debug(String.format("doWork(): %s records copied for out of %s, remaining: %s, incoming schema %s ", + copiedRecords, + incoming.getRecordCount(), + incoming.getRecordCount() - remainderIndex, + incoming.getSchema())); } private void handleRemainder() { int remainingRecordCount = incoming.getRecordCount() - remainderIndex; - int copiedRecords = copier.copyRecords(0, recordCount); + int copiedRecords = copier.copyRecords(0, remainingRecordCount); + if (copiedRecords < remainingRecordCount) { for(VectorWrapper<?> v : container){ ValueVector.Mutator m = v.getValueVector().getMutator(); @@ -142,6 +143,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect for(VectorWrapper<?> v : container){ ValueVector.Mutator m = v.getValueVector().getMutator(); m.setValueCount(remainingRecordCount); + this.recordCount = remainingRecordCount; } if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) { for(VectorWrapper<?> v: incoming) { @@ -151,6 +153,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect remainderIndex = 0; hasRemainder = false; } + logger.debug(String.format("handleRemainder(): %s records copied for out of %s, remaining: %s, incoming schema ", + copiedRecords, + incoming.getRecordCount(), + incoming.getRecordCount() - remainderIndex, + incoming.getSchema())); } public void cleanup(){ @@ -196,18 +203,17 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private Copier getGenerated2Copier() throws SchemaChangeException{ Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); - List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : incoming){ ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator()); container.add(v); - allocators.add(VectorAllocator.getAllocator(i.getValueVector(), v)); } try { final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry()); generateCopies(cg.getRoot(), incoming, false); Copier copier = context.getImplementationClass(cg); - copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()])); + copier.setupRemover(context, incoming, this, null); + return copier; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); @@ -221,19 +227,18 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{ - List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : batch){ ValueVector v = TypeHelper.getNewVector(i.getField(), allocator); container.add(v); - allocators.add(getAllocator4(v)); } try { final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry()); generateCopies(cg.getRoot(), batch, true); Copier copier = context.getImplementationClass(cg); - copier.setupRemover(context, batch, outgoing, allocators.toArray(new VectorAllocator[allocators.size()])); + copier.setupRemover(context, batch, outgoing, null); + return copier; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 99940f4..83b43fb 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -50,7 +50,28 @@ public class TestExampleQueries extends BaseTestQuery{ @Test public void testSelectWithLimit() throws Exception{ - test("select employee_id, first_name, last_name from cp.`employee.json` order by employee_id limit 5 offset 10"); + test("select employee_id, first_name, last_name from cp.`employee.json` limit 5 "); + } + + @Test + public void testSelectWithLimit2() throws Exception{ + test("select l_comment, l_orderkey from cp.`tpch/lineitem.parquet` limit 10000 "); + } + + @Test + public void testSVRV4() throws Exception{ + test("select employee_id, first_name from cp.`employee.json` order by employee_id "); + } + + @Test + public void testSVRV4MultBatch() throws Exception{ + test("select l_orderkey from cp.`tpch/lineitem.parquet` order by l_orderkey limit 10000 "); + } + + @Test + public void testSVRV4Join() throws Exception{ + test("select count(*) from cp.`tpch/lineitem.parquet` l, cp.`tpch/partsupp.parquet` ps \n" + + " where l.l_partkey = ps.ps_partkey and l.l_suppkey = ps.ps_suppkey ;"); } @Test http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java index 7a21cee..84195c3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java @@ -28,7 +28,7 @@ public class JsonRecordReader2Test extends BaseTestQuery{ @Test public void testComplexJsonInput() throws Exception{ // test("select z[0]['orange'] from cp.`jsoninput/input2.json` limit 10"); - test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink'] from cp.`jsoninput/input2.json` limit 10"); + test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink'] from cp.`jsoninput/input2.json` "); test("select x from cp.`jsoninput/input2.json`"); // test("select z[0] from cp.`jsoninput/input2.json` limit 10");