This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 9725d4dfd25e9ac6302122463832f3334dd615fa Author: Sorabh Hamirwasia <[email protected]> AuthorDate: Thu Aug 9 19:21:42 2018 -0700 DRILL-6687: Improve RemovingRecordBatch to do transfer when all records needs to be copied Add optimization in SelectionVector2 to enable RemovingRecordBatch to transfer ValueVectors from incoming to output container when all records needs to be copied. Modified FilterRecordBatch and LimitRecordBatch to play by this optimization --- .../drill/exec/physical/impl/TopN/TopNBatch.java | 6 +- .../exec/physical/impl/filter/FilterTemplate2.java | 7 ++ .../exec/physical/impl/limit/LimitRecordBatch.java | 1 + .../physical/impl/svremover/AbstractCopier.java | 17 +++-- .../physical/impl/svremover/AbstractSV2Copier.java | 23 ++++++- .../physical/impl/svremover/AbstractSV4Copier.java | 5 +- .../drill/exec/physical/impl/svremover/Copier.java | 9 ++- .../physical/impl/svremover/GenericCopier.java | 11 ++- .../impl/svremover/GenericCopierFactory.java | 54 +++++++++++++++ .../physical/impl/svremover/GenericSV2Copier.java | 17 ++++- .../physical/impl/svremover/GenericSV4Copier.java | 25 +++---- .../impl/svremover/RemovingRecordBatch.java | 80 +--------------------- .../physical/impl/svremover/StraightCopier.java | 69 +++++++++++++++++++ .../exec/record/selection/SelectionVector2.java | 20 ++++++ .../impl/svremover/AbstractGenericCopierTest.java | 35 ++++++---- .../physical/impl/svremover/GenericCopierTest.java | 8 ++- ...ierTest.java => GenericSV2BatchCopierTest.java} | 19 +++-- .../impl/svremover/GenericSV2CopierTest.java | 7 +- .../impl/svremover/GenericSV4CopierTest.java | 9 +-- .../apache/drill/test/rowSet/IndirectRowSet.java | 1 + 20 files changed, 262 insertions(+), 161 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 4fc0d15..2763f59 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -47,7 +47,7 @@ import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; import org.apache.drill.exec.physical.impl.svremover.Copier; -import org.apache.drill.exec.physical.impl.svremover.GenericSV4Copier; +import org.apache.drill.exec.physical.impl.svremover.GenericCopierFactory; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -360,7 +360,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { SelectionVector4 selectionVector4 = priorityQueue.getSv4(); SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); if (copier == null) { - copier = GenericSV4Copier.createCopier(batch, newContainer, null); + copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null); } else { for (VectorWrapper<?> i : batch) { @@ -468,7 +468,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { @SuppressWarnings("resource") final SelectionVector4 selectionVector4 = priorityQueue.getSv4(); final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); - copier = GenericSV4Copier.createCopier(batch, newContainer, null); + copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null); @SuppressWarnings("resource") SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator()); try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index 6d1f034..7b0183b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -67,11 +67,18 @@ public abstract class FilterTemplate2 implements Filterer { if (! outgoingSelectionVector.allocateNewSafe(recordCount)) { throw new OutOfMemoryException("Unable to allocate filter batch"); } + switch(svMode){ case NONE: + // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire + // batch if possible at once rather than row-by-row + outgoingSelectionVector.setBatchActualRecordCount(recordCount); filterBatchNoSV(recordCount); break; case TWO_BYTE: + // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire + // batch if possible at once rather than row-by-row + outgoingSelectionVector.setBatchActualRecordCount(incomingSelectionVector.getBatchActualRecordCount()); filterBatchSV2(recordCount); break; default: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 06f0fdb..a862714 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -194,6 +194,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { // clear memory for incoming sv (if any) if (incomingSv != null) { + outgoingSv.setBatchActualRecordCount(incomingSv.getBatchActualRecordCount()); incomingSv.clear(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java index ddea468..47ec1cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.svremover; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -31,7 +30,7 @@ public abstract class AbstractCopier implements Copier { protected VectorContainer outgoing; @Override - public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + public void setup(RecordBatch incoming, VectorContainer outgoing) { this.outgoing = outgoing; final int count = outgoing.getNumberOfColumns(); @@ -43,7 +42,7 @@ public abstract class AbstractCopier implements Copier { } @Override - public int copyRecords(int index, int recordCount) throws SchemaChangeException { + public int copyRecords(int index, int recordCount) { for(VectorWrapper<?> out : outgoing){ TypeProtos.MajorType type = out.getField().getType(); if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) { @@ -57,7 +56,7 @@ public abstract class AbstractCopier implements Copier { } @Override - public int appendRecord(int index) throws SchemaChangeException { + public int appendRecord(int index) { int outgoingPosition = outgoing.getRecordCount(); copyEntryIndirect(index, outgoingPosition); outgoingPosition++; @@ -66,11 +65,11 @@ public abstract class AbstractCopier implements Copier { } @Override - public int appendRecords(int index, int recordCount) throws SchemaChangeException { + public int appendRecords(int index, int recordCount) { return insertRecords(outgoing.getRecordCount(), index, recordCount); } - private int insertRecords(int outgoingPosition, int index, int recordCount) throws SchemaChangeException { + private int insertRecords(int outgoingPosition, int index, int recordCount) { final int endIndex = index + recordCount; for(int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++){ @@ -81,7 +80,7 @@ public abstract class AbstractCopier implements Copier { return outgoingPosition; } - private void updateCounts(int numRecords) { + protected void updateCounts(int numRecords) { outgoing.setRecordCount(numRecords); for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) { @@ -89,7 +88,7 @@ public abstract class AbstractCopier implements Copier { } } - public abstract void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException; + public abstract void copyEntryIndirect(int inIndex, int outIndex); - public abstract void copyEntry(int inIndex, int outIndex) throws SchemaChangeException; + public abstract void copyEntry(int inIndex, int outIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java index 321d9a8..ec712e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java @@ -17,19 +17,23 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.ValueVector; +import java.util.ArrayList; +import java.util.List; + public abstract class AbstractSV2Copier extends AbstractCopier { protected ValueVector[] vvIn; private SelectionVector2 sv2; + protected List<TransferPair> transferPairs = new ArrayList<>(); @Override - public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + public void setup(RecordBatch incoming, VectorContainer outgoing) { super.setup(incoming, outgoing); this.sv2 = incoming.getSelectionVector2(); @@ -46,7 +50,20 @@ public abstract class AbstractSV2Copier extends AbstractCopier { } } - public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntryIndirect(int inIndex, int outIndex) { copyEntry(sv2.getIndex(inIndex), outIndex); } + + @Override + public int copyRecords(int index, int recordCount) { + if (sv2.doFullTransfer()) { + for (TransferPair pair : transferPairs) { + pair.transfer(); + } + updateCounts(recordCount); + return recordCount; + } + + return super.copyRecords(index, recordCount); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java index cd6af07..56e2586 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -31,7 +30,7 @@ public abstract class AbstractSV4Copier extends AbstractCopier { private SelectionVector4 sv4; @Override - public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException{ + public void setup(RecordBatch incoming, VectorContainer outgoing) { super.setup(incoming, outgoing); this.sv4 = incoming.getSelectionVector4(); @@ -48,7 +47,7 @@ public abstract class AbstractSV4Copier extends AbstractCopier { } } - public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntryIndirect(int inIndex, int outIndex) { copyEntry(sv4.get(inIndex), outIndex); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java index bc31252..92dea70 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java @@ -17,13 +17,12 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; public interface Copier { - void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException; - int copyRecords(int index, int recordCount) throws SchemaChangeException; - int appendRecord(int index) throws SchemaChangeException; - int appendRecords(int index, int recordCount) throws SchemaChangeException; + void setup(RecordBatch incoming, VectorContainer outgoing); + int copyRecords(int index, int recordCount); + int appendRecord(int index); + int appendRecords(int index, int recordCount); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java index de048dc..72516e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -30,7 +29,7 @@ public class GenericCopier implements Copier { private VectorContainer outgoing; @Override - public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + public void setup(RecordBatch incoming, VectorContainer outgoing) { this.outgoing = outgoing; final int count = outgoing.getNumberOfColumns(); @@ -53,12 +52,12 @@ public class GenericCopier implements Copier { } @Override - public int copyRecords(int index, int recordCount) throws SchemaChangeException { + public int copyRecords(int index, int recordCount) { return insertRecords(0, index, recordCount); } @Override - public int appendRecord(int index) throws SchemaChangeException { + public int appendRecord(int index) { int outgoingPosition = outgoing.getRecordCount(); for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) { vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index); @@ -69,11 +68,11 @@ public class GenericCopier implements Copier { } @Override - public int appendRecords(int index, int recordCount) throws SchemaChangeException { + public int appendRecords(int index, int recordCount) { return insertRecords(outgoing.getRecordCount(), index, recordCount); } - private int insertRecords(int outgoingPosition, int startIndex, int recordCount) throws SchemaChangeException { + private int insertRecords(int outgoingPosition, int startIndex, int recordCount) { final int endIndex = startIndex + recordCount; for (int index = startIndex; index < endIndex; index++, outgoingPosition++) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java new file mode 100644 index 0000000..cd6dd02 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.SchemaChangeCallBack; + +public class GenericCopierFactory { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenericCopierFactory.class); + + public static Copier createAndSetupCopier(RecordBatch incoming, VectorContainer outputContainer, + SchemaChangeCallBack callBack) { + Copier copier; + + switch(incoming.getSchema().getSelectionVectorMode()){ + case NONE: + copier = new StraightCopier(incoming, outputContainer, callBack); + break; + case TWO_BYTE: + copier = new GenericSV2Copier(incoming, outputContainer, callBack); + break; + case FOUR_BYTE: + copier = new GenericSV4Copier(incoming, outputContainer, callBack); + break; + default: + throw new UnsupportedOperationException(); + } + + copier.setup(incoming, outputContainer); + return copier; + } + + public static Copier createAndSetupNonSVGenericCopier(RecordBatch incoming, VectorContainer outputContainer) { + Copier copier = new GenericCopier(); + copier.setup(incoming, outputContainer); + return copier; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java index a375f45..f607e8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java @@ -17,11 +17,24 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.SchemaChangeCallBack; public class GenericSV2Copier extends AbstractSV2Copier { + + public GenericSV2Copier(RecordBatch incomingBatch, VectorContainer outputContainer, + SchemaChangeCallBack callBack) { + for(VectorWrapper<?> vv : incomingBatch){ + TransferPair pair = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack)); + transferPairs.add(pair); + } + } + @Override - public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntry(int inIndex, int outIndex) { for ( int i = 0; i < vvIn.length; i++ ) { vvOut[i].copyEntry(outIndex, vvIn[i], inIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java index 1f3d28b..c676841 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -25,8 +24,18 @@ import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; public class GenericSV4Copier extends AbstractSV4Copier { + + public GenericSV4Copier(RecordBatch incomingBatch, VectorContainer outputContainer, + SchemaChangeCallBack callBack) { + for(VectorWrapper<?> vv : incomingBatch){ + @SuppressWarnings("resource") + ValueVector v = vv.getValueVectors()[0]; + v.makeTransferPair(outputContainer.addOrGet(v.getField(), callBack)); + } + } + @Override - public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntry(int inIndex, int outIndex) { int inOffset = inIndex & 0xFFFF; int inVector = inIndex >>> 16; for ( int i = 0; i < vvIn.length; i++ ) { @@ -34,16 +43,4 @@ public class GenericSV4Copier extends AbstractSV4Copier { vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset); } } - - public static Copier createCopier(RecordBatch batch, VectorContainer container, SchemaChangeCallBack callBack) throws SchemaChangeException { - for(VectorWrapper<?> vv : batch){ - @SuppressWarnings("resource") - ValueVector v = vv.getValueVectors()[0]; - v.makeTransferPair(container.addOrGet(v.getField(), callBack)); - } - - Copier copier = new GenericSV4Copier(); - copier.setup(batch, container); - return copier; - } } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index acfdc87..1471d5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import java.util.List; - import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; @@ -26,14 +24,9 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TransferPair; -import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class); @@ -56,19 +49,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container // change in incoming container type is not actual schema change. container.zeroVectors(); - switch(incoming.getSchema().getSelectionVectorMode()){ - case NONE: - this.copier = getStraightCopier(); - break; - case TWO_BYTE: - this.copier = create2Copier(); - break; - case FOUR_BYTE: - this.copier = create4Copier(); - break; - default: - throw new UnsupportedOperationException(); - } + copier = GenericCopierFactory.createAndSetupCopier(incoming, container, callBack); // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA // downstream too @@ -84,7 +65,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect protected IterOutcome doWork() { try { copier.copyRecords(0, incoming.getRecordCount()); - } catch (SchemaChangeException e) { + } catch (Exception e) { throw new IllegalStateException(e); } finally { if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) { @@ -107,63 +88,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect super.close(); } - private class StraightCopier implements Copier{ - - private List<TransferPair> pairs = Lists.newArrayList(); - - @Override - public void setup(RecordBatch incoming, VectorContainer outgoing){ - for(VectorWrapper<?> vv : incoming){ - TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); - pairs.add(tp); - } - } - - @Override - public int copyRecords(int index, int recordCount) { - assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch"; - for(TransferPair tp : pairs){ - tp.transfer(); - } - - container.setRecordCount(incoming.getRecordCount()); - return recordCount; - } - - @Override - public int appendRecord(int index) throws SchemaChangeException { - throw new UnsupportedOperationException(); - } - - @Override - public int appendRecords(int index, int recordCount) throws SchemaChangeException { - throw new UnsupportedOperationException(); - } - } - - private Copier getStraightCopier(){ - StraightCopier copier = new StraightCopier(); - copier.setup(incoming, container); - return copier; - } - - private Copier create2Copier() throws SchemaChangeException { - Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); - - for(VectorWrapper<?> vv : incoming){ - vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); - } - - Copier copier = new GenericSV2Copier(); - copier.setup(incoming, container); - return copier; - } - - private Copier create4Copier() throws SchemaChangeException { - Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); - return GenericSV4Copier.createCopier(incoming, container, callBack); - } - @Override public WritableBatch getWritableBatch() { return WritableBatch.get(this); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java new file mode 100644 index 0000000..33f2a96 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.SchemaChangeCallBack; + +import java.util.List; + +public class StraightCopier implements Copier { + private List<TransferPair> pairs = Lists.newArrayList(); + private RecordBatch incoming; + private VectorContainer outputContainer; + private SchemaChangeCallBack callBack; + + public StraightCopier(RecordBatch incomingBatch, VectorContainer outputContainer, SchemaChangeCallBack callBack) { + this.incoming = incomingBatch; + this.outputContainer = outputContainer; + this.callBack = callBack; + } + + @Override + public void setup(RecordBatch incoming, VectorContainer outgoing) { + for(VectorWrapper<?> vv : incoming){ + TransferPair tp = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack)); + pairs.add(tp); + } + } + + @Override + public int copyRecords(int index, int recordCount) { + assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch"; + for(TransferPair tp : pairs){ + tp.transfer(); + } + + outputContainer.setRecordCount(incoming.getRecordCount()); + return recordCount; + } + + @Override + public int appendRecord(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int appendRecords(int index, int recordCount) { + throw new UnsupportedOperationException(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index 7244148..1d9c5da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -37,6 +37,7 @@ public class SelectionVector2 implements AutoCloseable { private final BufferAllocator allocator; private int recordCount; + private int batchActualRecordCount = -1; private DrillBuf buffer = DeadBuf.DEAD_BUFFER; public static final int RECORD_SIZE = 2; @@ -61,6 +62,11 @@ public class SelectionVector2 implements AutoCloseable { recordCount = count; } + public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count, int actualRecordCount) { + this(allocator, buf, count); + this.batchActualRecordCount = actualRecordCount; + } + public int getCount() { return recordCount; } @@ -127,6 +133,7 @@ public class SelectionVector2 implements AutoCloseable { public SelectionVector2 clone() { SelectionVector2 newSV = new SelectionVector2(allocator); newSV.recordCount = recordCount; + newSV.batchActualRecordCount = batchActualRecordCount; newSV.buffer = buffer; /* Since buffer and newSV.buffer essentially point to the @@ -143,6 +150,7 @@ public class SelectionVector2 implements AutoCloseable { buffer.release(); buffer = DeadBuf.DEAD_BUFFER; recordCount = 0; + batchActualRecordCount = -1; } } @@ -151,6 +159,18 @@ public class SelectionVector2 implements AutoCloseable { this.recordCount = recordCount; } + public boolean doFullTransfer() { + return (recordCount == batchActualRecordCount); + } + + public void setBatchActualRecordCount(int actualRecordCount) { + this.batchActualRecordCount = actualRecordCount; + } + + public int getBatchActualRecordCount() { + return batchActualRecordCount; + } + @Override public void close() { clear(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java index 01263b1..7d444b4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java @@ -17,34 +17,34 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import com.google.common.collect.Lists; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.RootAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBatch; import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; import org.junit.Test; -import java.util.List; - public abstract class AbstractGenericCopierTest { @Test public void testCopyRecords() throws SchemaChangeException { try (RootAllocator allocator = new RootAllocator(10_000_000)) { - final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); final RowSet srcRowSet = createSrcRowSet(allocator); final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build(); final VectorContainer destContainer = destRowSet.container(); - final Copier copier = createCopier(); + final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null); final RowSet expectedRowSet = createExpectedRowset(allocator); - copier.setup(new RowSetBatch(srcRowSet), destContainer); copier.copyRecords(0, 3); try { @@ -65,14 +65,13 @@ public abstract class AbstractGenericCopierTest { @Test public void testAppendRecords() throws SchemaChangeException { try (RootAllocator allocator = new RootAllocator(10_000_000)) { - final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); final RowSet srcRowSet = createSrcRowSet(allocator); final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build(); final VectorContainer destContainer = destRowSet.container(); - final Copier copier = createCopier(); + final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null); final RowSet expectedRowSet = createExpectedRowset(allocator); - copier.setup(new RowSetBatch(srcRowSet), destContainer); copier.appendRecord(0); copier.appendRecords(1, 2); @@ -93,7 +92,10 @@ public abstract class AbstractGenericCopierTest { public abstract RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException; - public abstract Copier createCopier(); + public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer, + SchemaChangeCallBack callback) { + return GenericCopierFactory.createAndSetupCopier(incoming, outputContainer, callback); + } public static Object[] row1() { return new Object[]{110, "green", new float[]{5.5f, 2.3f}, new String[]{"1a", "1b"}}; @@ -115,7 +117,7 @@ public abstract class AbstractGenericCopierTest { return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}}; } - public static RowSet createExpectedRowset(RootAllocator allocator) { + private RowSet createExpectedRowset(RootAllocator allocator) { return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE)) .addRow(row1()) .addRow(row2()) @@ -123,14 +125,17 @@ public abstract class AbstractGenericCopierTest { .build(); } - public static BatchSchema createTestSchema(BatchSchema.SelectionVectorMode mode) { + protected TupleMetadata createTestSchema(BatchSchema.SelectionVectorMode mode) { MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT)); MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR)); MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4)); MaterializedField colD = MaterializedField.create("colD", Types.repeated(TypeProtos.MinorType.VARCHAR)); - List<MaterializedField> cols = Lists.newArrayList(colA, colB, colC, colD); - BatchSchema batchSchema = new BatchSchema(mode, cols); - return batchSchema; + return new SchemaBuilder().add(colA) + .add(colB) + .add(colC) + .add(colD) + .withSVMode(mode) + .buildSchema(); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java index f946f81..d6c38e7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java @@ -19,6 +19,9 @@ package org.apache.drill.exec.physical.impl.svremover; import org.apache.drill.exec.memory.RootAllocator; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBuilder; @@ -35,7 +38,8 @@ public class GenericCopierTest extends AbstractGenericCopierTest { } @Override - public Copier createCopier() { - return new GenericCopier(); + public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer, + SchemaChangeCallBack callback) { + return GenericCopierFactory.createAndSetupNonSVGenericCopier(incoming, outputContainer); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java similarity index 79% copy from exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java copy to exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java index 428124d..748e0d0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java @@ -21,21 +21,20 @@ import org.apache.drill.exec.memory.RootAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBuilder; -public class GenericSV2CopierTest extends AbstractGenericCopierTest { + +/** + * Verifies optimization in SV2 such that when total record to copy is same as number of records in the + * underlying batch for SV2 then SV2 will do transfer rather than row by row copy + */ +public class GenericSV2BatchCopierTest extends AbstractGenericCopierTest { + @Override public RowSet createSrcRowSet(RootAllocator allocator) { return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE)) - .addRow(row1()) - .addSelection(false, row4()) + .addSelection(true, row1()) .addRow(row2()) - .addSelection(false, row5()) - .addRow(row3()) + .addSelection(true, row3()) .withSv2() .build(); } - - @Override - public Copier createCopier() { - return new GenericSV2Copier(); - } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java index 428124d..b2f0e51 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java @@ -21,7 +21,9 @@ import org.apache.drill.exec.memory.RootAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBuilder; + public class GenericSV2CopierTest extends AbstractGenericCopierTest { + @Override public RowSet createSrcRowSet(RootAllocator allocator) { return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE)) @@ -33,9 +35,4 @@ public class GenericSV2CopierTest extends AbstractGenericCopierTest { .withSv2() .build(); } - - @Override - public Copier createCopier() { - return new GenericSV2Copier(); - } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java index 447ad3a..a5f5bb7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java @@ -23,15 +23,17 @@ import org.apache.drill.exec.memory.RootAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.test.rowSet.HyperRowSetImpl; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBuilder; public class GenericSV4CopierTest extends AbstractGenericCopierTest { + @Override public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException { - final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); final DrillBuf drillBuf = allocator.buffer(4 * 3); final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, Character.MAX_VALUE); @@ -57,9 +59,4 @@ public class GenericSV4CopierTest extends AbstractGenericCopierTest { return new HyperRowSetImpl(hyperContainer, sv4); } - - @Override - public Copier createCopier() { - return new GenericSV4Copier(); - } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java index f0ebdc0..878aa25 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java @@ -96,6 +96,7 @@ public class IndirectRowSet extends AbstractSingleRowSet { destIndex++; } sv2.setRecordCount(rowCount); + sv2.setBatchActualRecordCount(container.getRecordCount()); container.buildSchema(SelectionVectorMode.TWO_BYTE); return sv2; }
