This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 5ed5df2 DRILL-6517: Hash Join handling uninitialized vector container 5ed5df2 is described below commit 5ed5df2cef7b3aac83401a3df6bbf9716505b226 Author: Ben-Zvi <bben-...@mapr.com> AuthorDate: Tue Aug 28 15:57:04 2018 -0700 DRILL-6517: Hash Join handling uninitialized vector container --- .../exec/physical/impl/join/HashJoinBatch.java | 8 +- .../drill/exec/physical/impl/MockRecordBatch.java | 8 +- .../physical/impl/join/TestHashJoinOutcome.java | 212 +++++++++++++++++++++ 3 files changed, 219 insertions(+), 9 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 3d58089..dc40b24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -82,6 +82,7 @@ import org.apache.drill.exec.work.filter.RuntimeFilterReporter; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; /** @@ -449,7 +450,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { prefetchFirstProbeBatch(); - if (leftUpstream.isError()) { + if (leftUpstream.isError() || + ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType != JoinRelType.RIGHT )) { // A termination condition was reached while prefetching the first probe side data holding batch. // We need to terminate. return leftUpstream; @@ -496,9 +498,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { * Either case build the output container's schema and return */ if (outputRecords > 0 || state == BatchState.FIRST) { - if (state == BatchState.FIRST) { - state = BatchState.NOT_FIRST; - } + state = BatchState.NOT_FIRST; return IterOutcome.OK; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java index 9e048b0..34d735e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java @@ -202,7 +202,6 @@ public class MockRecordBatch implements CloseableRecordBatch { if (currentContainerIndex < rowSets.size()) { final RowSet rowSet = rowSets.get(currentContainerIndex); final VectorContainer input = rowSet.container(); - final int recordCount = input.getRecordCount(); // We need to do this since the downstream operator expects vector reference to be same // after first next call in cases when schema is not changed final BatchSchema inputSchema = input.getSchema(); @@ -215,7 +214,9 @@ public class MockRecordBatch implements CloseableRecordBatch { case NONE: case TWO_BYTE: container.transferIn(input); - container.setRecordCount(recordCount); + if ( input.hasRecordCount() ) { // in case special test of uninitialized input container + container.setRecordCount(input.getRecordCount()); + } final SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2(); if (sv2 != null) { @@ -257,10 +258,7 @@ public class MockRecordBatch implements CloseableRecordBatch { case NONE: case STOP: case OUT_OF_MEMORY: - //case OK_NEW_SCHEMA: isDone = true; - container.setRecordCount(0); - return currentOutcome; case NOT_YET: container.setRecordCount(0); return currentOutcome; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java new file mode 100644 index 0000000..349a295 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.sql.SqlKind; +import org.apache.drill.categories.OperatorTest; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.impl.MockRecordBatch; +import org.apache.drill.test.PhysicalOpUnitTestBase; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.store.mock.MockStorePOP; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +/** + * Unit tests of the Hash Join getting various outcomes as input + * with uninitialized vector containers + */ +@Category(OperatorTest.class) +public class TestHashJoinOutcome extends PhysicalOpUnitTestBase { + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestHashJoinOutcome.class); + + // input batch schemas + private static TupleSchema inputSchemaRight; + private static TupleSchema inputSchemaLeft; + private static BatchSchema batchSchemaRight; + private static BatchSchema batchSchemaLeft; + + // Input containers -- where row count is not set for the 2nd container !! + private List<VectorContainer> uninitialized2ndInputContainersRight = new ArrayList<>(5); + private List<VectorContainer> uninitialized2ndInputContainersLeft = new ArrayList<>(5); + + private RowSet.SingleRowSet emptyInputRowSetRight; + private RowSet.SingleRowSet emptyInputRowSetLeft; + + // default Non-Empty input RowSets + private RowSet.SingleRowSet nonEmptyInputRowSetRight; + private RowSet.SingleRowSet nonEmptyInputRowSetLeft; + + // List of incoming containers + private final List<VectorContainer> inputContainerRight = new ArrayList<>(5); + private final List<VectorContainer> inputContainerLeft = new ArrayList<>(5); + + // List of incoming IterOutcomes + private final List<RecordBatch.IterOutcome> inputOutcomesRight = new ArrayList<>(5); + private final List<RecordBatch.IterOutcome> inputOutcomesLeft = new ArrayList<>(5); + + @BeforeClass + public static void setUpBeforeClass() { + inputSchemaRight = (TupleSchema) new SchemaBuilder() + .add("rightcol", TypeProtos.MinorType.INT) + .buildSchema(); + batchSchemaRight = inputSchemaRight.toBatchSchema(BatchSchema.SelectionVectorMode.NONE); + inputSchemaLeft = (TupleSchema) new SchemaBuilder() + .add("leftcol", TypeProtos.MinorType.INT) + .buildSchema(); + batchSchemaLeft = inputSchemaLeft.toBatchSchema(BatchSchema.SelectionVectorMode.NONE); + } + + private void prepareUninitContainers(List<VectorContainer> emptyInputContainers, + BatchSchema batchSchema) { + BufferAllocator allocator = operatorFixture.getFragmentContext().getAllocator(); + + VectorContainer vc1 = new VectorContainer(allocator, batchSchema); + // set for first vc (with OK_NEW_SCHEMA) because record count is checked at AbstractRecordBatch.next + vc1.setRecordCount(0); + VectorContainer vc2 = new VectorContainer(allocator, batchSchema); + // Note - Uninitialized: Record count NOT SET for vc2 !! + emptyInputContainers.add(vc1); + emptyInputContainers.add(vc2); + } + + @Before + public void beforeTest() throws Exception { + + prepareUninitContainers(uninitialized2ndInputContainersLeft, batchSchemaLeft); + + prepareUninitContainers(uninitialized2ndInputContainersRight, batchSchemaRight); + + nonEmptyInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight) + .addRow(123) + .build(); + nonEmptyInputRowSetLeft = operatorFixture.rowSetBuilder(inputSchemaLeft) + .addRow(123) + .build(); + + // Prepare various (empty/non-empty) containers for each side of the join + emptyInputRowSetLeft = operatorFixture.rowSetBuilder(inputSchemaLeft).build(); + emptyInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).build(); + + inputContainerRight.add(emptyInputRowSetRight.container()); + inputContainerRight.add(nonEmptyInputRowSetRight.container()); + + inputContainerLeft.add(emptyInputRowSetLeft.container()); + inputContainerLeft.add(nonEmptyInputRowSetLeft.container()); + + final PhysicalOperator mockPopConfig = new MockStorePOP(null); + mockOpContext(mockPopConfig, 0, 0); + } + + @After + public void afterTest() { + emptyInputRowSetRight.clear(); + emptyInputRowSetLeft.clear(); + nonEmptyInputRowSetRight.clear(); + nonEmptyInputRowSetLeft.clear(); + inputContainerRight.clear(); + inputOutcomesRight.clear(); + inputContainerLeft.clear(); + inputOutcomesLeft.clear(); + } + + enum UninitializedSide { // which side of the join has an uninitialized container + Right(true), Left(false); + public boolean isRight; + UninitializedSide(boolean which) {this.isRight = which;} + } + + /** + * Run the Hash Join where one side has an uninitialized container (the 2nd one) + * @param uninitializedSide Which side (right or left) is the uninitialized + * @param specialOutcome What outcome the uninitialized container has + * @param expectedOutcome what result outcome is expected + */ + private void testHashJoinOutcomes(UninitializedSide uninitializedSide, RecordBatch.IterOutcome specialOutcome, + RecordBatch.IterOutcome expectedOutcome) { + + inputOutcomesLeft.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomesLeft.add( uninitializedSide.isRight ? RecordBatch.IterOutcome.OK : specialOutcome); + + inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomesRight.add( uninitializedSide.isRight ? specialOutcome : RecordBatch.IterOutcome.OK); + + final MockRecordBatch mockInputBatchRight = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + uninitializedSide.isRight ? uninitialized2ndInputContainersRight : inputContainerRight, + inputOutcomesRight, batchSchemaRight); + final MockRecordBatch mockInputBatchLeft = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + uninitializedSide.isRight ? inputContainerLeft : uninitialized2ndInputContainersLeft, + inputOutcomesLeft, batchSchemaLeft); + + List<JoinCondition> conditions = Lists.newArrayList(); + + conditions.add(new JoinCondition( SqlKind.EQUALS.toString(), + FieldReference.getWithQuotedRef("leftcol"), + FieldReference.getWithQuotedRef("rightcol"))); + + HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, JoinRelType.INNER); + + HashJoinBatch hjBatch = new HashJoinBatch(hjConf,operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight ); + + RecordBatch.IterOutcome gotOutcome = hjBatch.next(); + assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA ); + + gotOutcome = hjBatch.next(); + assertTrue(gotOutcome == expectedOutcome); // verify returned outcome + } + + @Test + public void testHashJoinStopOutcomeUninitRightSide() { + testHashJoinOutcomes(UninitializedSide.Right, RecordBatch.IterOutcome.STOP, RecordBatch.IterOutcome.STOP); + } + + @Test + public void testHashJoinStopOutcomeUninitLeftSide() { + testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.STOP, RecordBatch.IterOutcome.STOP); + } + + @Test + public void testHashJoinNoneOutcomeUninitRightSide() { + testHashJoinOutcomes(UninitializedSide.Right, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE); + } + + @Test + public void testHashJoinNoneOutcomeUninitLeftSide() { + testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE); + } +}