DRILL-5993: Used generic copiers in the selection vector remover, and implemented testing improvements for RowSets and codegen.
closes #1057 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/55c2bea0 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/55c2bea0 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/55c2bea0 Branch: refs/heads/master Commit: 55c2bea04c28d22e58ef055ee35947c1b6cec21c Parents: c8fdfd6 Author: Timothy Farkas <[email protected]> Authored: Thu Nov 30 09:55:32 2017 -0800 Committer: Ben-Zvi <[email protected]> Committed: Fri Feb 2 17:47:11 2018 -0800 ---------------------------------------------------------------------- .../exec/physical/impl/TopN/TopNBatch.java | 8 +- .../physical/impl/svremover/AbstractCopier.java | 95 ++++++++++++ .../impl/svremover/AbstractSV2Copier.java | 53 +++++++ .../impl/svremover/AbstractSV4Copier.java | 53 +++++++ .../exec/physical/impl/svremover/Copier.java | 12 +- .../impl/svremover/CopierTemplate2.java | 71 --------- .../impl/svremover/CopierTemplate4.java | 72 --------- .../physical/impl/svremover/GenericCopier.java | 96 ++++++++++++ .../impl/svremover/GenericSV2Copier.java | 31 +--- .../impl/svremover/GenericSV4Copier.java | 42 ++---- .../impl/svremover/RemovingRecordBatch.java | 148 ++++--------------- .../exec/record/ExpandableHyperContainer.java | 4 + .../apache/drill/exec/record/RecordBatch.java | 31 ++-- .../drill/exec/record/VectorAccessible.java | 14 +- .../svremover/AbstractGenericCopierTest.java | 136 +++++++++++++++++ .../impl/svremover/GenericCopierTest.java | 41 +++++ .../impl/svremover/GenericSV2CopierTest.java | 41 +++++ .../impl/svremover/GenericSV4CopierTest.java | 65 ++++++++ .../drill/exec/record/TestVectorContainer.java | 3 +- .../apache/drill/test/BaseDirTestWatcher.java | 10 ++ .../org/apache/drill/test/BaseTestQuery.java | 5 + .../org/apache/drill/test/ClusterFixture.java | 2 + .../apache/drill/test/rowSet/DirectRowSet.java | 11 +- .../drill/test/rowSet/IndirectRowSet.java | 32 +++- .../org/apache/drill/test/rowSet/RowSet.java | 3 + .../apache/drill/test/rowSet/RowSetBatch.java | 108 ++++++++++++++ .../apache/drill/test/rowSet/RowSetBuilder.java | 22 ++- 27 files changed, 839 insertions(+), 370 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 1683286..3929714 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -47,7 +47,7 @@ import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; import org.apache.drill.exec.physical.impl.svremover.Copier; -import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; +import org.apache.drill.exec.physical.impl.svremover.GenericSV4Copier; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -295,7 +295,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context); if (copier == null) { - copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null); + copier = GenericSV4Copier.createCopier(batch, newContainer, null); } else { for (VectorWrapper<?> i : batch) { @@ -303,7 +303,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator()); newContainer.add(v); } - copier.setupRemover(context, batch, newBatch); + copier.setup(batch, newContainer); } @SuppressWarnings("resource") SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator()); @@ -415,7 +415,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { final SelectionVector4 selectionVector4 = priorityQueue.getSv4(); final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context); - copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null); + copier = GenericSV4Copier.createCopier(batch, newContainer, null); @SuppressWarnings("resource") SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator()); try { http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java new file mode 100644 index 0000000..ddea468 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.ValueVector; + +public abstract class AbstractCopier implements Copier { + protected ValueVector[] vvOut; + protected VectorContainer outgoing; + + @Override + public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + this.outgoing = outgoing; + + final int count = outgoing.getNumberOfColumns(); + vvOut = new ValueVector[count]; + + for (int index = 0; index < count; index++) { + vvOut[index] = outgoing.getValueVector(index).getValueVector(); + } + } + + @Override + public int copyRecords(int index, int recordCount) throws SchemaChangeException { + for(VectorWrapper<?> out : outgoing){ + TypeProtos.MajorType type = out.getField().getType(); + if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) { + out.getValueVector().allocateNew(); + } else { + AllocationHelper.allocate(out.getValueVector(), recordCount, 1); + } + } + + return insertRecords(0, index, recordCount); + } + + @Override + public int appendRecord(int index) throws SchemaChangeException { + int outgoingPosition = outgoing.getRecordCount(); + copyEntryIndirect(index, outgoingPosition); + outgoingPosition++; + updateCounts(outgoingPosition); + return outgoingPosition; + } + + @Override + public int appendRecords(int index, int recordCount) throws SchemaChangeException { + return insertRecords(outgoing.getRecordCount(), index, recordCount); + } + + private int insertRecords(int outgoingPosition, int index, int recordCount) throws SchemaChangeException { + final int endIndex = index + recordCount; + + for(int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++){ + copyEntryIndirect(svIndex, outgoingPosition); + } + + updateCounts(outgoingPosition); + return outgoingPosition; + } + + private void updateCounts(int numRecords) { + outgoing.setRecordCount(numRecords); + + for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) { + vvOut[vectorIndex].getMutator().setValueCount(numRecords); + } + } + + public abstract void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException; + + public abstract void copyEntry(int inIndex, int outIndex) throws SchemaChangeException; +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java new file mode 100644 index 0000000..d9f1c8e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.vector.ValueVector; + +public abstract class AbstractSV2Copier extends AbstractCopier { + protected ValueVector[] vvIn; + private SelectionVector2 sv2; + + @Override + public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + super.setup(incoming, outgoing); + this.sv2 = incoming.getSelectionVector2(); + + final int count = outgoing.getNumberOfColumns(); + + vvIn = new ValueVector[count]; + + { + int index = 0; + + for (VectorWrapper vectorWrapper: incoming) { + vvIn[index] = vectorWrapper.getValueVector(); + index++; + } + } + } + + public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException { + copyEntry(sv2.getIndex(inIndex), outIndex); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java new file mode 100644 index 0000000..4f3afc3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.ValueVector; + +public abstract class AbstractSV4Copier extends AbstractCopier { + protected ValueVector[][] vvIn; + private SelectionVector4 sv4; + + @Override + public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException{ + super.setup(incoming, outgoing); + this.sv4 = incoming.getSelectionVector4(); + + final int count = outgoing.getNumberOfColumns(); + + vvIn = new ValueVector[count][]; + + { + int index = 0; + + for (VectorWrapper vectorWrapper: incoming) { + vvIn[index] = vectorWrapper.getValueVectors(); + index++; + } + } + } + + public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException { + copyEntry(sv4.get(inIndex), outIndex); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java index 9e265d7..bc31252 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java @@ -17,15 +17,13 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; public interface Copier { - public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate2.class); - public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class); - - public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException; - public abstract int copyRecords(int index, int recordCount) throws SchemaChangeException; + void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException; + int copyRecords(int index, int recordCount) throws SchemaChangeException; + int appendRecord(int index) throws SchemaChangeException; + int appendRecords(int index, int recordCount) throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java deleted file mode 100644 index 96daf7f..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.svremover; - -import javax.inject.Named; - -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.vector.AllocationHelper; - - -public abstract class CopierTemplate2 implements Copier{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class); - - private SelectionVector2 sv2; - private RecordBatch outgoing; - - @Override - public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException{ - this.sv2 = incoming.getSelectionVector2(); - this.outgoing = outgoing; - doSetup(context, incoming, outgoing); - } - - @Override - public int copyRecords(int index, int recordCount) throws SchemaChangeException { - for(VectorWrapper<?> out : outgoing){ - MajorType type = out.getField().getType(); - if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) { - out.getValueVector().allocateNew(); - } else { - AllocationHelper.allocate(out.getValueVector(), recordCount, 1); - } - } - - int outgoingPosition = 0; - - for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){ - doEval(sv2.getIndex(svIndex), outgoingPosition); - } - return outgoingPosition; - } - - public abstract void doSetup(@Named("context") FragmentContext context, - @Named("incoming") RecordBatch incoming, - @Named("outgoing") RecordBatch outgoing) - throws SchemaChangeException; - public abstract void doEval(@Named("inIndex") int inIndex, - @Named("outIndex") int outIndex) - throws SchemaChangeException; -} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java deleted file mode 100644 index 1ae7df9..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.svremover; - -import javax.inject.Named; - -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.AllocationHelper; - -public abstract class CopierTemplate4 implements Copier{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class); - - private SelectionVector4 sv4; - private RecordBatch outgoing; - - - @Override - public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException{ - this.outgoing = outgoing; - this.sv4 = incoming.getSelectionVector4(); - doSetup(context, incoming, outgoing); - } - - - @Override - public int copyRecords(int index, int recordCount) throws SchemaChangeException { - for(VectorWrapper<?> out : outgoing){ - MajorType type = out.getField().getType(); - if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) { - out.getValueVector().allocateNew(); - } else { - AllocationHelper.allocate(out.getValueVector(), recordCount, 1); - } - } - - int outgoingPosition = 0; - for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){ - int deRefIndex = sv4.get(svIndex); - doEval(deRefIndex, outgoingPosition); - } - return outgoingPosition; - } - - public abstract void doSetup(@Named("context") FragmentContext context, - @Named("incoming") RecordBatch incoming, - @Named("outgoing") RecordBatch outgoing) - throws SchemaChangeException; - public abstract void doEval(@Named("inIndex") int inIndex, - @Named("outIndex") int outIndex) - throws SchemaChangeException; -} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java new file mode 100644 index 0000000..de048dc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; + +public class GenericCopier implements Copier { + private ValueVector[] vvOut; + private ValueVector[] vvIn; + + private VectorContainer outgoing; + + @Override + public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + this.outgoing = outgoing; + + final int count = outgoing.getNumberOfColumns(); + + vvIn = new ValueVector[count]; + vvOut = new ValueVector[count]; + + { + int index = 0; + + for (VectorWrapper vectorWrapper: incoming) { + vvIn[index] = vectorWrapper.getValueVector(); + index++; + } + } + + for (int index = 0; index < count; index++) { + vvOut[index] = outgoing.getValueVector(index).getValueVector(); + } + } + + @Override + public int copyRecords(int index, int recordCount) throws SchemaChangeException { + return insertRecords(0, index, recordCount); + } + + @Override + public int appendRecord(int index) throws SchemaChangeException { + int outgoingPosition = outgoing.getRecordCount(); + for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) { + vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index); + } + outgoingPosition++; + updateCounts(outgoingPosition); + return outgoingPosition; + } + + @Override + public int appendRecords(int index, int recordCount) throws SchemaChangeException { + return insertRecords(outgoing.getRecordCount(), index, recordCount); + } + + private int insertRecords(int outgoingPosition, int startIndex, int recordCount) throws SchemaChangeException { + final int endIndex = startIndex + recordCount; + + for (int index = startIndex; index < endIndex; index++, outgoingPosition++) { + for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) { + vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index); + } + } + + updateCounts(outgoingPosition); + return outgoingPosition; + } + + private void updateCounts(int numRecords) { + outgoing.setRecordCount(numRecords); + + for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) { + vvOut[vectorIndex].getMutator().setValueCount(numRecords); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java index 2fc17a3..a375f45 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java @@ -18,37 +18,10 @@ package org.apache.drill.exec.physical.impl.svremover; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.ValueVector; - -public class GenericSV2Copier extends CopierTemplate2 { - - private ValueVector[] vvOut; - private ValueVector[] vvIn; - - @SuppressWarnings("unused") - @Override - public void doSetup(FragmentContext context, RecordBatch incoming, - RecordBatch outgoing) throws SchemaChangeException { - - int count = 0; - for(VectorWrapper<?> vv : incoming) { - count++; - } - vvIn = new ValueVector[count]; - vvOut = new ValueVector[count]; - int i = 0; - for(VectorWrapper<?> vv : incoming) { - vvIn[i] = incoming.getValueAccessorById(ValueVector.class, i).getValueVector(); - vvOut[i] = outgoing.getValueAccessorById(ValueVector.class, i).getValueVector(); - i++; - } - } +public class GenericSV2Copier extends AbstractSV2Copier { @Override - public void doEval(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { for ( int i = 0; i < vvIn.length; i++ ) { vvOut[i].copyEntry(outIndex, vvIn[i], inIndex); } http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java index 0950791..f9b153d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java @@ -18,41 +18,31 @@ package org.apache.drill.exec.physical.impl.svremover; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; -public class GenericSV4Copier extends CopierTemplate4 { - - private ValueVector[] vvOut; - private ValueVector[][] vvIn; - - @SuppressWarnings("unused") - @Override - public void doSetup(FragmentContext context, RecordBatch incoming, - RecordBatch outgoing) throws SchemaChangeException { - - int count = 0; - for(VectorWrapper<?> vv : incoming) { - count++; - } - vvIn = new ValueVector[count][]; - vvOut = new ValueVector[count]; - int i = 0; - for(VectorWrapper<?> vv : incoming) { - vvIn[i] = incoming.getValueAccessorById(ValueVector.class, i).getValueVectors(); - vvOut[i] = outgoing.getValueAccessorById(ValueVector.class, i).getValueVector(); - i++; - } - } - +public class GenericSV4Copier extends AbstractSV4Copier { @Override - public void doEval(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { int inOffset = inIndex & 0xFFFF; int inVector = inIndex >>> 16; for ( int i = 0; i < vvIn.length; i++ ) { vvOut[i].copyEntry(outIndex, vvIn[i][inVector], inOffset); } } + + public static Copier createCopier(RecordBatch batch, VectorContainer container, SchemaChangeCallBack callBack) throws SchemaChangeException { + for(VectorWrapper<?> vv : batch){ + @SuppressWarnings("resource") + ValueVector v = vv.getValueVectors()[0]; + v.makeTransferPair(container.addOrGet(v.getField(), callBack)); + } + + Copier copier = new GenericSV4Copier(); + copier.setup(batch, container); + return copier; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 66fe261..08ca029 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -17,10 +17,8 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import java.io.IOException; import java.util.List; -import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.CodeGenerator; @@ -33,9 +31,6 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.vector.CopyUtil; -import org.apache.drill.exec.vector.SchemaChangeCallBack; -import org.apache.drill.exec.vector.ValueVector; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -44,9 +39,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class); private Copier copier; - private int recordCount; - private boolean hasRemainder; - private int remainderIndex; public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, incoming); @@ -55,7 +47,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect @Override public int getRecordCount() { - return recordCount; + return container.getRecordCount(); } @Override @@ -66,10 +58,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect this.copier = getStraightCopier(); break; case TWO_BYTE: - this.copier = getGenerated2Copier(); + this.copier = create2Copier(); break; case FOUR_BYTE: - this.copier = getGenerated4Copier(); + this.copier = create4Copier(); break; default: throw new UnsupportedOperationException(); @@ -85,37 +77,16 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect @Override public IterOutcome innerNext() { - if (hasRemainder) { - handleRemainder(); - return IterOutcome.OK; - } return super.innerNext(); } @Override protected IterOutcome doWork() { - int incomingRecordCount = incoming.getRecordCount(); - int copiedRecords; try { - copiedRecords = copier.copyRecords(0, incomingRecordCount); + copier.copyRecords(0, incoming.getRecordCount()); } catch (SchemaChangeException e) { throw new IllegalStateException(e); - } - - if (copiedRecords < incomingRecordCount) { - for(VectorWrapper<?> v : container){ - ValueVector.Mutator m = v.getValueVector().getMutator(); - m.setValueCount(copiedRecords); - } - hasRemainder = true; - remainderIndex = copiedRecords; - this.recordCount = remainderIndex; - } else { - recordCount = copiedRecords; - for(VectorWrapper<?> v : container){ - ValueVector.Mutator m = v.getValueVector().getMutator(); - m.setValueCount(recordCount); - } + } finally { if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) { for(VectorWrapper<?> v: incoming) { v.clear(); @@ -126,56 +97,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } } - assert recordCount >= copiedRecords; - logger.debug("doWork(): {} records copied out of {}, remaining: {}, incoming schema {} ", - copiedRecords, - incomingRecordCount, - incomingRecordCount - remainderIndex, - incoming.getSchema()); + logger.debug("doWork(): {} records copied out of {}, incoming schema {} ", + container.getRecordCount(), container.getRecordCount(), incoming.getSchema()); return IterOutcome.OK; } - private void handleRemainder() { - int recordCount = incoming.getRecordCount(); - int remainingRecordCount = incoming.getRecordCount() - remainderIndex; - int copiedRecords; - try { - while((copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount)) == 0) { - logger.debug("Copied zero records. Retrying"); - container.zeroVectors(); - } - } catch (SchemaChangeException e) { - throw new IllegalStateException(e); - } - - if (copiedRecords < remainingRecordCount) { - for(VectorWrapper<?> v : container){ - ValueVector.Mutator m = v.getValueVector().getMutator(); - m.setValueCount(copiedRecords); - } - remainderIndex += copiedRecords; - this.recordCount = copiedRecords; - } else { - for(VectorWrapper<?> v : container){ - ValueVector.Mutator m = v.getValueVector().getMutator(); - m.setValueCount(remainingRecordCount); - this.recordCount = remainingRecordCount; - } - if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) { - for(VectorWrapper<?> v: incoming) { - v.clear(); - } - } - remainderIndex = 0; - hasRemainder = false; - } - logger.debug(String.format("handleRemainder(): %s records copied out of %s, remaining: %s, incoming schema %s ", - copiedRecords, - recordCount, - recordCount - remainderIndex, - incoming.getSchema())); - } - @Override public void close() { super.close(); @@ -186,7 +112,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private List<TransferPair> pairs = Lists.newArrayList(); @Override - public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing){ + public void setup(RecordBatch incoming, VectorContainer outgoing){ for(VectorWrapper<?> vv : incoming){ TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); pairs.add(tp); @@ -199,65 +125,43 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect for(TransferPair tp : pairs){ tp.transfer(); } + + container.setRecordCount(incoming.getRecordCount()); return recordCount; } + @Override + public int appendRecord(int index) throws SchemaChangeException { + throw new UnsupportedOperationException(); + } + + @Override + public int appendRecords(int index, int recordCount) throws SchemaChangeException { + throw new UnsupportedOperationException(); + } } private Copier getStraightCopier(){ StraightCopier copier = new StraightCopier(); - copier.setupRemover(context, incoming, this); + copier.setup(incoming, container); return copier; } - private Copier getGenerated2Copier() throws SchemaChangeException{ + private Copier create2Copier() throws SchemaChangeException { Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); for(VectorWrapper<?> vv : incoming){ vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); } - try { - final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getOptions()); - CopyUtil.generateCopies(cg.getRoot(), incoming, false); - Copier copier = context.getImplementationClass(cg); - copier.setupRemover(context, incoming, this); - cg.plainJavaCapable(true); - // Uncomment out this line to debug the generated code. -// cg.saveCodeForDebugging(true); - - return copier; - } catch (ClassTransformationException | IOException e) { - throw new SchemaChangeException("Failure while attempting to load generated class", e); - } + Copier copier = new GenericSV2Copier(); + copier.setup(incoming, container); + return copier; } - private Copier getGenerated4Copier() throws SchemaChangeException { + private Copier create4Copier() throws SchemaChangeException { Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); - return getGenerated4Copier(incoming, context, container, this, callBack); - } - - public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, VectorContainer container, RecordBatch outgoing, - SchemaChangeCallBack callBack) throws SchemaChangeException{ - - for(VectorWrapper<?> vv : batch){ - @SuppressWarnings("resource") - ValueVector v = vv.getValueVectors()[0]; - v.makeTransferPair(container.addOrGet(v.getField(), callBack)); - } - - try { - final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getOptions()); - CopyUtil.generateCopies(cg.getRoot(), batch, true); - cg.plainJavaCapable(true); - // Uncomment out this line to debug the generated code. -// cg.saveCodeForDebugging(true); - Copier copier = context.getImplementationClass(cg); - copier.setupRemover(context, batch, outgoing); - return copier; - } catch (ClassTransformationException | IOException e) { - throw new SchemaChangeException("Failure while attempting to load generated class", e); - } + return GenericSV4Copier.createCopier(incoming, container, callBack); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java index 9037340..13656a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java @@ -42,6 +42,8 @@ public class ExpandableHyperContainer extends VectorContainer { this.add(hyperVector, true); } } + + buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); } public void addBatch(VectorAccessible batch) { @@ -62,5 +64,7 @@ public class ExpandableHyperContainer extends VectorContainer { hyperVectorWrapper.addVector(w.getValueVector()); } } + + buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); } } http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index acb7a9b..7fc086d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -39,7 +39,7 @@ import org.apache.drill.exec.vector.ValueVector; public interface RecordBatch extends VectorAccessible { /** max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */ - public static final int MAX_BATCH_SIZE = ValueVector.MAX_ROW_COUNT; + int MAX_BATCH_SIZE = ValueVector.MAX_ROW_COUNT; /** * Describes the outcome of incrementing RecordBatch forward by a call to @@ -102,7 +102,7 @@ public interface RecordBatch extends VectorAccessible { * ) * </p> */ - public static enum IterOutcome { + enum IterOutcome { /** * Normal completion of batch. * <p> @@ -205,35 +205,29 @@ public interface RecordBatch extends VectorAccessible { * Gets the FragmentContext of the current query fragment. Useful for * reporting failure information or other query-level information. */ - public FragmentContext getContext(); + FragmentContext getContext(); /** * Gets the current schema of this record batch. * <p> * May be called only when the most recent call to {@link #next}, if any, - * returned {@link #OK_NEW_SCHEMA} or {@link #OK}. + * returned {@link IterOutcome#OK_NEW_SCHEMA} or {@link IterOutcome#OK}. * </p> * <p> * The schema changes when and only when {@link #next} returns - * {@link #OK_NEW_SCHEMA}. + * {@link IterOutcome#OK_NEW_SCHEMA}. * </p> */ @Override - public BatchSchema getSchema(); - - /** - * Gets the number of records that are within this record. - */ - @Override - public int getRecordCount(); + BatchSchema getSchema(); /** * Informs child nodes that this query should be terminated. Child nodes * should use the QueryContext to determine what has happened. */ - public void kill(boolean sendUpstream); + void kill(boolean sendUpstream); - public VectorContainer getOutgoingContainer(); + VectorContainer getOutgoingContainer(); /** * Gets the value vector type and ID for the given schema path. The @@ -247,10 +241,10 @@ public interface RecordBatch extends VectorAccessible { * TypedFieldId */ @Override - public abstract TypedFieldId getValueVectorId(SchemaPath path); + TypedFieldId getValueVectorId(SchemaPath path); @Override - public abstract VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids); + VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids); /** * Updates the data in each Field reading interface for the next range of @@ -269,12 +263,11 @@ public interface RecordBatch extends VectorAccessible { * * @return An IterOutcome describing the result of the iteration. */ - public IterOutcome next(); + IterOutcome next(); /** * Gets a writable version of this batch. Takes over ownership of existing * buffers. */ - public WritableBatch getWritableBatch(); - + WritableBatch getWritableBatch(); } http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java index f1a250c..acf94e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java @@ -25,7 +25,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; public interface VectorAccessible extends Iterable<VectorWrapper<?>> { // TODO are these <?> releated in any way? Should they be the same one? // TODO javadoc - public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds); + VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds); /** * Get the value vector type and id for the given schema path. The TypedFieldId @@ -36,7 +36,7 @@ public interface VectorAccessible extends Iterable<VectorWrapper<?>> { * @return the local field id associated with this vector. If no field matches this * path, this will return a null TypedFieldId */ - public TypedFieldId getValueVectorId(SchemaPath path); + TypedFieldId getValueVectorId(SchemaPath path); /** * Get the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA @@ -44,18 +44,16 @@ public interface VectorAccessible extends Iterable<VectorWrapper<?>> { * * @return schema of the current batch */ - public BatchSchema getSchema(); + BatchSchema getSchema(); /** * Get the number of records. * * @return number of records */ - public int getRecordCount(); - - public abstract SelectionVector2 getSelectionVector2(); - - public abstract SelectionVector4 getSelectionVector4(); + int getRecordCount(); + SelectionVector2 getSelectionVector2(); + SelectionVector4 getSelectionVector4(); } http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java new file mode 100644 index 0000000..01263b1 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import com.google.common.collect.Lists; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.RootAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBatch; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.Test; + +import java.util.List; + +public abstract class AbstractGenericCopierTest { + @Test + public void testCopyRecords() throws SchemaChangeException { + try (RootAllocator allocator = new RootAllocator(10_000_000)) { + final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final RowSet srcRowSet = createSrcRowSet(allocator); + final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build(); + final VectorContainer destContainer = destRowSet.container(); + final Copier copier = createCopier(); + final RowSet expectedRowSet = createExpectedRowset(allocator); + + copier.setup(new RowSetBatch(srcRowSet), destContainer); + copier.copyRecords(0, 3); + + try { + new RowSetComparison(expectedRowSet).verify(destRowSet); + } finally { + srcRowSet.clear(); + + if (srcRowSet instanceof RowSet.HyperRowSet) { + ((RowSet.HyperRowSet)srcRowSet).getSv4().clear(); + } + + destRowSet.clear(); + expectedRowSet.clear(); + } + } + } + + @Test + public void testAppendRecords() throws SchemaChangeException { + try (RootAllocator allocator = new RootAllocator(10_000_000)) { + final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final RowSet srcRowSet = createSrcRowSet(allocator); + final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build(); + final VectorContainer destContainer = destRowSet.container(); + final Copier copier = createCopier(); + final RowSet expectedRowSet = createExpectedRowset(allocator); + + copier.setup(new RowSetBatch(srcRowSet), destContainer); + copier.appendRecord(0); + copier.appendRecords(1, 2); + + try { + new RowSetComparison(expectedRowSet).verify(destRowSet); + } finally { + srcRowSet.clear(); + + if (srcRowSet instanceof RowSet.HyperRowSet) { + ((RowSet.HyperRowSet)srcRowSet).getSv4().clear(); + } + + destRowSet.clear(); + expectedRowSet.clear(); + } + } + } + + public abstract RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException; + + public abstract Copier createCopier(); + + public static Object[] row1() { + return new Object[]{110, "green", new float[]{5.5f, 2.3f}, new String[]{"1a", "1b"}}; + } + + public static Object[] row2() { + return new Object[]{109, "blue", new float[]{1.5f}, new String[]{"2a"}}; + } + + public static Object[] row3() { + return new Object[]{108, "red", new float[]{-11.1f, 0.0f, .5f}, new String[]{"3a", "3b", "3c"}}; + } + + public static Object[] row4() { + return new Object[]{107, "yellow", new float[]{4.25f, 1.25f}, new String[]{}}; + } + + public static Object[] row5() { + return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}}; + } + + public static RowSet createExpectedRowset(RootAllocator allocator) { + return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE)) + .addRow(row1()) + .addRow(row2()) + .addRow(row3()) + .build(); + } + + public static BatchSchema createTestSchema(BatchSchema.SelectionVectorMode mode) { + MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT)); + MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR)); + MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4)); + MaterializedField colD = MaterializedField.create("colD", Types.repeated(TypeProtos.MinorType.VARCHAR)); + + List<MaterializedField> cols = Lists.newArrayList(colA, colB, colC, colD); + BatchSchema batchSchema = new BatchSchema(mode, cols); + return batchSchema; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java new file mode 100644 index 0000000..f946f81 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.memory.RootAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; + +public class GenericCopierTest extends AbstractGenericCopierTest { + @Override + public RowSet createSrcRowSet(RootAllocator allocator) { + return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE)) + .addRow(row1()) + .addRow(row2()) + .addRow(row3()) + .addRow(row4()) + .addRow(row5()) + .build(); + } + + @Override + public Copier createCopier() { + return new GenericCopier(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java new file mode 100644 index 0000000..428124d --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.memory.RootAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +public class GenericSV2CopierTest extends AbstractGenericCopierTest { + @Override + public RowSet createSrcRowSet(RootAllocator allocator) { + return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE)) + .addRow(row1()) + .addSelection(false, row4()) + .addRow(row2()) + .addSelection(false, row5()) + .addRow(row3()) + .withSv2() + .build(); + } + + @Override + public Copier createCopier() { + return new GenericSV2Copier(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java new file mode 100644 index 0000000..447ad3a --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.RootAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.ExpandableHyperContainer; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.test.rowSet.HyperRowSetImpl; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; + +public class GenericSV4CopierTest extends AbstractGenericCopierTest { + @Override + public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException { + final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final DrillBuf drillBuf = allocator.buffer(4 * 3); + final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, Character.MAX_VALUE); + + final VectorContainer batch1 = new RowSetBuilder(allocator, batchSchema) + .addRow(row1()) + .addRow(row4()) + .build() + .container(); + + final VectorContainer batch2 = new RowSetBuilder(allocator, batchSchema) + .addRow(row2()) + .addRow(row5()) + .addRow(row3()) + .build() + .container(); + + final ExpandableHyperContainer hyperContainer = new ExpandableHyperContainer(batch1); + hyperContainer.addBatch(batch2); + + sv4.set(0, 0, 0); + sv4.set(1, 1, 0); + sv4.set(2, 1, 2); + + return new HyperRowSetImpl(hyperContainer, sv4); + } + + @Override + public Copier createCopier() { + return new GenericSV4Copier(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java index ef79780..c6c18d5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java @@ -38,8 +38,7 @@ public class TestVectorContainer extends DrillTest { // TODO: Replace the following with an extension of SubOperatorTest class // once that is available. - - protected volatile static OperatorFixture fixture; + protected static OperatorFixture fixture; @BeforeClass public static void setUpBeforeClass() throws Exception { http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java index b595869..d36423b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java @@ -53,6 +53,7 @@ public class BaseDirTestWatcher extends DirTestWatcher { TEST_TMP // Corresponds to the directory that should be mapped to dfs.tmp } + private File codegenDir; private File tmpDir; private File storeDir; private File dfsTestTmpParentDir; @@ -78,6 +79,7 @@ public class BaseDirTestWatcher extends DirTestWatcher { protected void starting(Description description) { super.starting(description); + codegenDir = makeSubDir(Paths.get("codegen")); rootDir = makeSubDir(Paths.get("root")); tmpDir = makeSubDir(Paths.get("tmp")); storeDir = makeSubDir(Paths.get("store")); @@ -133,6 +135,14 @@ public class BaseDirTestWatcher extends DirTestWatcher { } /** + * Gets the temp directory that should be used to save generated code files. + * @return The temp directory that should be used to save generated code files. + */ + public File getCodegenDir() { + return codegenDir; + } + + /** * This methods creates a new directory which can be mapped to <b>dfs.tmp</b>. */ public void newDfsTestTmpDir() { http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java index 387caa7..c3ecaf1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java @@ -35,6 +35,8 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.exec.compile.ClassBuilder; +import org.apache.drill.exec.compile.CodeCompiler; import org.apache.drill.test.DrillTestWrapper.TestServices; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.DrillProperties; @@ -72,6 +74,7 @@ import com.google.common.io.Resources; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.test.ClusterFixture; +import org.junit.ClassRule; public class BaseTestQuery extends ExecTest { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class); @@ -152,10 +155,12 @@ public class BaseTestQuery extends ExecTest { protected static Properties cloneDefaultTestConfigProperties() { final Properties props = new Properties(); + for(String propName : TEST_CONFIGURATIONS.stringPropertyNames()) { props.put(propName, TEST_CONFIGURATIONS.getProperty(propName)); } + props.setProperty(ClassBuilder.CODE_DIR_OPTION, dirTestWatcher.getCodegenDir().getAbsolutePath()); props.setProperty(ExecConstants.DRILL_TMP_DIR, dirTestWatcher.getTmpDir().getAbsolutePath()); props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java index 9ddcdb7..6dbdacd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.drill.exec.compile.ClassBuilder; import org.apache.drill.test.DrillTestWrapper.TestServices; import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -515,6 +516,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { .sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, MAX_WIDTH_PER_NODE); Properties props = new Properties(); props.putAll(ClusterFixture.TEST_CONFIGURATIONS); + props.setProperty(ClassBuilder.CODE_DIR_OPTION, dirTestWatcher.getCodegenDir().getAbsolutePath()); props.setProperty(ExecConstants.DRILL_TMP_DIR, dirTestWatcher.getTmpDir().getAbsolutePath()); props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java index 5972f05..b5b1f1f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java @@ -17,6 +17,7 @@ */ package org.apache.drill.test.rowSet; +import com.google.common.collect.Sets; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; import org.apache.drill.exec.physical.rowSet.model.SchemaInference; @@ -34,6 +35,8 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; import org.apache.drill.test.rowSet.RowSetWriterImpl.WriterIndexImpl; +import java.util.Set; + /** * Implementation of a single row set with no indirection (selection) * vector. @@ -140,9 +143,15 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS @Override public SingleRowSet toIndirect() { - return new IndirectRowSet(this); + return new IndirectRowSet(this, Sets.<Integer>newHashSet()); } @Override + public SingleRowSet toIndirect(Set<Integer> skipIndices) { + return new IndirectRowSet(this, skipIndices); + } + + + @Override public SelectionVector2 getSv2() { return null; } } http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java index e729bba..cc9895d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java @@ -17,6 +17,7 @@ */ package org.apache.drill.test.rowSet; +import com.google.common.collect.Sets; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; @@ -26,6 +27,8 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector2; +import java.util.Set; + /** * Single row set coupled with an indirection (selection) vector, * specifically an SV2. @@ -65,31 +68,41 @@ public class IndirectRowSet extends AbstractSingleRowSet { this.sv2 = sv2; } + public IndirectRowSet(VectorContainer container) { + this(container, makeSv2(container.getAllocator(), container, Sets.<Integer>newHashSet())); + } + public static IndirectRowSet fromContainer(VectorContainer container) { - return new IndirectRowSet(container, makeSv2(container.getAllocator(), container)); + return new IndirectRowSet(container, makeSv2(container.getAllocator(), container, Sets.<Integer>newHashSet())); } public static IndirectRowSet fromSv2(VectorContainer container, SelectionVector2 sv2) { return new IndirectRowSet(container, sv2); } - private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContainer container) { - int rowCount = container.getRecordCount(); + private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContainer container, + Set<Integer> skipIndices) { + int rowCount = container.getRecordCount() - skipIndices.size(); SelectionVector2 sv2 = new SelectionVector2(allocator); if (!sv2.allocateNewSafe(rowCount)) { throw new OutOfMemoryException("Unable to allocate sv2 buffer"); } - for (int i = 0; i < rowCount; i++) { - sv2.setIndex(i, (char) i); + for (int srcIndex = 0, destIndex = 0; srcIndex < container.getRecordCount(); srcIndex++) { + if (skipIndices.contains(srcIndex)) { + continue; + } + + sv2.setIndex(destIndex, (char)srcIndex); + destIndex++; } sv2.setRecordCount(rowCount); container.buildSchema(SelectionVectorMode.TWO_BYTE); return sv2; } - public IndirectRowSet(DirectRowSet directRowSet) { + public IndirectRowSet(DirectRowSet directRowSet, Set<Integer> skipIndices) { super(directRowSet); - sv2 = makeSv2(allocator(), container()); + sv2 = makeSv2(allocator(), container(), skipIndices); } @Override @@ -119,6 +132,11 @@ public class IndirectRowSet extends AbstractSingleRowSet { public SingleRowSet toIndirect() { return this; } @Override + public SingleRowSet toIndirect(Set<Integer> skipIndices) { + return new IndirectRowSet(DirectRowSet.fromContainer(container()), skipIndices); + } + + @Override public long size() { RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2); return sizer.actualSize(); http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java index f2435de..ec0925e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java @@ -28,6 +28,8 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.parquet.column.ColumnWriter; +import java.util.Set; + /** * A row set is a collection of rows stored as value vectors. Elsewhere in * Drill we call this a "record batch", but that term has been overloaded to @@ -117,6 +119,7 @@ public interface RowSet { public interface SingleRowSet extends RowSet { SingleRowSet toIndirect(); + SingleRowSet toIndirect(Set<Integer> skipIndices); SelectionVector2 getSv2(); } http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java new file mode 100644 index 0000000..02156f6 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test.rowSet; + +import com.google.common.base.Preconditions; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +import java.util.Iterator; + +public class RowSetBatch implements RecordBatch { + private final RowSet rowSet; + + public RowSetBatch(final RowSet rowSet) { + this.rowSet = Preconditions.checkNotNull(rowSet); + } + + @Override + public FragmentContext getContext() { + throw new UnsupportedOperationException(); + } + + @Override + public BatchSchema getSchema() { + return rowSet.batchSchema(); + } + + @Override + public int getRecordCount() { + return rowSet.container().getRecordCount(); + } + + @Override + public SelectionVector2 getSelectionVector2() { + if (rowSet instanceof IndirectRowSet) { + return ((IndirectRowSet)rowSet).getSv2(); + } + + throw new UnsupportedOperationException(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + if (rowSet instanceof RowSet.HyperRowSet) { + return ((RowSet.HyperRowSet)rowSet).getSv4(); + } + + throw new UnsupportedOperationException(); + } + + @Override + public void kill(boolean sendUpstream) { + throw new UnsupportedOperationException(); + } + + @Override + public VectorContainer getOutgoingContainer() { + return rowSet.container(); + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + return rowSet.container().getValueVectorId(path); + } + + @Override + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return rowSet.container().getValueAccessorById(clazz, ids); + } + + @Override + public IterOutcome next() { + throw new UnsupportedOperationException(); + } + + @Override + public WritableBatch getWritableBatch() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + return rowSet.container().iterator(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/55c2bea0/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java index 7b1554c..d6bbaf8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java @@ -17,14 +17,16 @@ */ package org.apache.drill.test.rowSet; +import com.google.common.collect.Sets; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.TupleMetadata; import org.apache.drill.exec.record.TupleSchema; import org.apache.drill.exec.vector.accessor.TupleWriter; -import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import java.util.Set; + /** * Fluent builder to quickly build up an row set (record batch) * programmatically. Starting with an {@link org.apache.drill.test.OperatorFixture}: @@ -41,6 +43,7 @@ public final class RowSetBuilder { private DirectRowSet rowSet; private RowSetWriter writer; private boolean withSv2; + private Set<Integer> skipIndices = Sets.newHashSet(); public RowSetBuilder(BufferAllocator allocator, BatchSchema schema) { this(allocator, TupleSchema.fromFields(schema), 10); @@ -75,6 +78,17 @@ public final class RowSetBuilder { return this; } + public RowSetBuilder addSelection(boolean selected, Object...values) { + final int index = writer.rowIndex(); + writer.setRow(values); + + if (!selected) { + skipIndices.add(index); + } + + return this; + } + /** * The {@link #addRow(Object...)} method uses Java variable-length arguments to * pass a row of values. But, when the row consists of a single array, Java @@ -106,6 +120,10 @@ public final class RowSetBuilder { return addRow(new Object[] { value }); } + public RowSetBuilder addSingleCol(boolean selected, Object value) { + return addSelection(selected, new Object[] { value }); + } + /** * Build the row set with a selection vector 2. The SV2 is * initialized to have a 1:1 index to the rows: SV2 0 points @@ -122,7 +140,7 @@ public final class RowSetBuilder { public SingleRowSet build() { SingleRowSet result = writer.done(); if (withSv2) { - return result.toIndirect(); + return rowSet.toIndirect(skipIndices); } return result; }
