This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ed5df2  DRILL-6517: Hash Join handling uninitialized vector container
5ed5df2 is described below

commit 5ed5df2cef7b3aac83401a3df6bbf9716505b226
Author: Ben-Zvi <bben-...@mapr.com>
AuthorDate: Tue Aug 28 15:57:04 2018 -0700

    DRILL-6517: Hash Join handling uninitialized vector container
---
 .../exec/physical/impl/join/HashJoinBatch.java     |   8 +-
 .../drill/exec/physical/impl/MockRecordBatch.java  |   8 +-
 .../physical/impl/join/TestHashJoinOutcome.java    | 212 +++++++++++++++++++++
 3 files changed, 219 insertions(+), 9 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 3d58089..dc40b24 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -82,6 +82,7 @@ import 
org.apache.drill.exec.work.filter.RuntimeFilterReporter;
 
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 
 /**
@@ -449,7 +450,8 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
         prefetchFirstProbeBatch();
 
-        if (leftUpstream.isError()) {
+        if (leftUpstream.isError() ||
+            ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType 
!= JoinRelType.RIGHT )) {
           // A termination condition was reached while prefetching the first 
probe side data holding batch.
           // We need to terminate.
           return leftUpstream;
@@ -496,9 +498,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
            * Either case build the output container's schema and return
            */
           if (outputRecords > 0 || state == BatchState.FIRST) {
-            if (state == BatchState.FIRST) {
-              state = BatchState.NOT_FIRST;
-            }
+            state = BatchState.NOT_FIRST;
 
             return IterOutcome.OK;
           }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 9e048b0..34d735e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -202,7 +202,6 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
     if (currentContainerIndex < rowSets.size()) {
       final RowSet rowSet = rowSets.get(currentContainerIndex);
       final VectorContainer input = rowSet.container();
-      final int recordCount = input.getRecordCount();
       // We need to do this since the downstream operator expects vector 
reference to be same
       // after first next call in cases when schema is not changed
       final BatchSchema inputSchema = input.getSchema();
@@ -215,7 +214,9 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
         case NONE:
         case TWO_BYTE:
           container.transferIn(input);
-          container.setRecordCount(recordCount);
+          if ( input.hasRecordCount() ) { // in case special test of 
uninitialized input container
+            container.setRecordCount(input.getRecordCount());
+          }
           final SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) 
rowSet).getSv2();
 
           if (sv2 != null) {
@@ -257,10 +258,7 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
       case NONE:
       case STOP:
       case OUT_OF_MEMORY:
-      //case OK_NEW_SCHEMA:
         isDone = true;
-        container.setRecordCount(0);
-        return currentOutcome;
       case NOT_YET:
         container.setRecordCount(0);
         return currentOutcome;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
new file mode 100644
index 0000000..349a295
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ *  Unit tests of the Hash Join getting various outcomes as input
+ *  with uninitialized vector containers
+ */
+@Category(OperatorTest.class)
+public class TestHashJoinOutcome extends PhysicalOpUnitTestBase {
+  // private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestHashJoinOutcome.class);
+
+  // input batch schemas
+  private static TupleSchema inputSchemaRight;
+  private static TupleSchema inputSchemaLeft;
+  private static BatchSchema batchSchemaRight;
+  private static BatchSchema batchSchemaLeft;
+
+  // Input containers -- where row count is not set for the 2nd container !!
+  private List<VectorContainer> uninitialized2ndInputContainersRight = new 
ArrayList<>(5);
+  private List<VectorContainer> uninitialized2ndInputContainersLeft = new 
ArrayList<>(5);
+
+  private RowSet.SingleRowSet emptyInputRowSetRight;
+  private RowSet.SingleRowSet emptyInputRowSetLeft;
+
+  // default Non-Empty input RowSets
+  private RowSet.SingleRowSet nonEmptyInputRowSetRight;
+  private RowSet.SingleRowSet nonEmptyInputRowSetLeft;
+
+  // List of incoming containers
+  private final List<VectorContainer> inputContainerRight = new ArrayList<>(5);
+  private final List<VectorContainer> inputContainerLeft = new ArrayList<>(5);
+
+  // List of incoming IterOutcomes
+  private final List<RecordBatch.IterOutcome> inputOutcomesRight = new 
ArrayList<>(5);
+  private final List<RecordBatch.IterOutcome> inputOutcomesLeft = new 
ArrayList<>(5);
+
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    inputSchemaRight = (TupleSchema) new SchemaBuilder()
+      .add("rightcol", TypeProtos.MinorType.INT)
+      .buildSchema();
+    batchSchemaRight = 
inputSchemaRight.toBatchSchema(BatchSchema.SelectionVectorMode.NONE);
+    inputSchemaLeft = (TupleSchema) new SchemaBuilder()
+      .add("leftcol", TypeProtos.MinorType.INT)
+      .buildSchema();
+    batchSchemaLeft = 
inputSchemaLeft.toBatchSchema(BatchSchema.SelectionVectorMode.NONE);
+  }
+
+  private void prepareUninitContainers(List<VectorContainer> 
emptyInputContainers,
+                                       BatchSchema batchSchema) {
+    BufferAllocator allocator = 
operatorFixture.getFragmentContext().getAllocator();
+
+    VectorContainer vc1 = new VectorContainer(allocator, batchSchema);
+    // set for first vc (with OK_NEW_SCHEMA) because record count is checked 
at AbstractRecordBatch.next
+    vc1.setRecordCount(0);
+    VectorContainer vc2 = new VectorContainer(allocator, batchSchema);
+    // Note - Uninitialized: Record count NOT SET for vc2 !!
+    emptyInputContainers.add(vc1);
+    emptyInputContainers.add(vc2);
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+
+    prepareUninitContainers(uninitialized2ndInputContainersLeft, 
batchSchemaLeft);
+
+    prepareUninitContainers(uninitialized2ndInputContainersRight, 
batchSchemaRight);
+
+    nonEmptyInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight)
+      .addRow(123)
+      .build();
+    nonEmptyInputRowSetLeft = operatorFixture.rowSetBuilder(inputSchemaLeft)
+      .addRow(123)
+      .build();
+
+    // Prepare various (empty/non-empty) containers for each side of the join
+    emptyInputRowSetLeft = 
operatorFixture.rowSetBuilder(inputSchemaLeft).build();
+    emptyInputRowSetRight = 
operatorFixture.rowSetBuilder(inputSchemaRight).build();
+
+    inputContainerRight.add(emptyInputRowSetRight.container());
+    inputContainerRight.add(nonEmptyInputRowSetRight.container());
+
+    inputContainerLeft.add(emptyInputRowSetLeft.container());
+    inputContainerLeft.add(nonEmptyInputRowSetLeft.container());
+
+    final PhysicalOperator mockPopConfig = new MockStorePOP(null);
+    mockOpContext(mockPopConfig, 0, 0);
+  }
+
+  @After
+  public void afterTest() {
+    emptyInputRowSetRight.clear();
+    emptyInputRowSetLeft.clear();
+    nonEmptyInputRowSetRight.clear();
+    nonEmptyInputRowSetLeft.clear();
+    inputContainerRight.clear();
+    inputOutcomesRight.clear();
+    inputContainerLeft.clear();
+    inputOutcomesLeft.clear();
+  }
+
+  enum UninitializedSide { // which side of the join has an uninitialized 
container
+    Right(true), Left(false);
+    public boolean isRight;
+    UninitializedSide(boolean which) {this.isRight = which;}
+  }
+
+  /**
+   *  Run the Hash Join where one side has an uninitialized container (the 2nd 
one)
+   * @param uninitializedSide Which side (right or left) is the uninitialized
+   * @param specialOutcome What outcome the uninitialized container has
+   * @param expectedOutcome what result outcome is expected
+   */
+  private void testHashJoinOutcomes(UninitializedSide uninitializedSide, 
RecordBatch.IterOutcome specialOutcome,
+                                    RecordBatch.IterOutcome expectedOutcome) {
+
+    inputOutcomesLeft.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomesLeft.add( uninitializedSide.isRight ? 
RecordBatch.IterOutcome.OK : specialOutcome);
+
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomesRight.add( uninitializedSide.isRight ? specialOutcome : 
RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatchRight = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      uninitializedSide.isRight ? uninitialized2ndInputContainersRight : 
inputContainerRight,
+      inputOutcomesRight, batchSchemaRight);
+    final MockRecordBatch mockInputBatchLeft = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      uninitializedSide.isRight ? inputContainerLeft : 
uninitialized2ndInputContainersLeft,
+      inputOutcomesLeft, batchSchemaLeft);
+
+    List<JoinCondition> conditions = Lists.newArrayList();
+
+    conditions.add(new JoinCondition( SqlKind.EQUALS.toString(),
+      FieldReference.getWithQuotedRef("leftcol"),
+      FieldReference.getWithQuotedRef("rightcol")));
+
+    HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, 
JoinRelType.INNER);
+
+    HashJoinBatch hjBatch = new 
HashJoinBatch(hjConf,operatorFixture.getFragmentContext(), mockInputBatchLeft, 
mockInputBatchRight );
+
+    RecordBatch.IterOutcome gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA );
+
+    gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == expectedOutcome); // verify returned outcome
+  }
+
+  @Test
+  public void testHashJoinStopOutcomeUninitRightSide() {
+    testHashJoinOutcomes(UninitializedSide.Right, 
RecordBatch.IterOutcome.STOP, RecordBatch.IterOutcome.STOP);
+  }
+
+  @Test
+  public void testHashJoinStopOutcomeUninitLeftSide() {
+    testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.STOP, 
RecordBatch.IterOutcome.STOP);
+  }
+
+  @Test
+  public void testHashJoinNoneOutcomeUninitRightSide() {
+    testHashJoinOutcomes(UninitializedSide.Right, 
RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE);
+  }
+
+  @Test
+  public void testHashJoinNoneOutcomeUninitLeftSide() {
+    testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, 
RecordBatch.IterOutcome.NONE);
+  }
+}

Reply via email to