http://git-wip-us.apache.org/repos/asf/drill/blob/74565ccb/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java new file mode 100644 index 0000000..8e4d1d3 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java @@ -0,0 +1,2492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.categories.OperatorTest; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.physical.impl.MockRecordBatch; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.store.mock.MockStorePOP; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertTrue; + +@Category(OperatorTest.class) +public class TestLateralJoinCorrectness extends SubOperatorTest { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestNestedNotYet.class); + + // Operator Context for mock batch + private static OperatorContext operatorContext; + + // Left Batch Schema + private static TupleMetadata leftSchema; + + // Right Batch Schema + private static TupleMetadata rightSchema; + + // Empty left RowSet + private static RowSet.SingleRowSet emptyLeftRowSet; + + // Non-Empty left RowSet + private static RowSet.SingleRowSet nonEmptyLeftRowSet; + + // List of left incoming containers + private static final List<VectorContainer> leftContainer = new ArrayList<>(5); + + // List of left IterOutcomes + private static final List<RecordBatch.IterOutcome> leftOutcomes = new ArrayList<>(5); + + // Empty right RowSet + private static RowSet.SingleRowSet emptyRightRowSet; + + // Non-Empty right RowSet + private static RowSet.SingleRowSet nonEmptyRightRowSet; + + // List of right incoming containers + private static final List<VectorContainer> rightContainer = new ArrayList<>(5); + + // List of right IterOutcomes + private static final List<RecordBatch.IterOutcome> rightOutcomes = new ArrayList<>(5); + + // Lateral Join POP Config + private static LateralJoinPOP ljPopConfig; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + PhysicalOperator mockPopConfig = new MockStorePOP(null); + operatorContext = fixture.newOperatorContext(mockPopConfig); + + leftSchema = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.INT) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + emptyLeftRowSet = fixture.rowSetBuilder(leftSchema).build(); + + rightSchema = new SchemaBuilder() + .add("id_right", TypeProtos.MinorType.INT) + .add("cost_right", TypeProtos.MinorType.INT) + .add("name_right", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build(); + + ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + operatorContext.close(); + emptyLeftRowSet.clear(); + emptyRightRowSet.clear(); + } + + + @Before + public void beforeTest() throws Exception { + nonEmptyLeftRowSet = fixture.rowSetBuilder(leftSchema) + .addRow(1, 10, "item1") + .build(); + + nonEmptyRightRowSet = fixture.rowSetBuilder(rightSchema) + .addRow(1, 11, "item11") + .addRow(2, 21, "item21") + .addRow(3, 31, "item31") + .build(); + } + + @After + public void afterTest() throws Exception { + nonEmptyLeftRowSet.clear(); + nonEmptyRightRowSet.clear(); + leftContainer.clear(); + leftOutcomes.clear(); + rightContainer.clear(); + rightOutcomes.clear(); + } + + /** + * Helper method to check if input outcome is one of terminal state or not + * + * @param outcome - input IterOutcome state to check for + * @return + */ + private boolean isTerminal(RecordBatch.IterOutcome outcome) { + return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP) + || (outcome == RecordBatch.IterOutcome.OUT_OF_MEMORY); + } + + /** + * With both left and right batch being empty, the {@link LateralJoinBatch#buildSchema()} phase + * should still build the output container schema and return empty batch + * + * @throws Exception + */ + @Test + public void testBuildSchemaEmptyLRBatch() throws Exception { + + // Get the left container with dummy data for Lateral Join + leftContainer.add(emptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, emptyLeftRowSet.container().getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, emptyRightRowSet.container().getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == 0); + + while (!isTerminal(ljBatch.next())) { + // do nothing + } + + // TODO: We can add check for output correctness as well + //assertTrue (((MockRecordBatch) leftMockBatch).isCompleted()); + //assertTrue(((MockRecordBatch) rightMockBatch).isCompleted()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * With both left and right batch being non-empty, the {@link LateralJoinBatch#buildSchema()} phase + * will fail with {@link IllegalStateException} since it expects first batch from right branch of LATERAL + * to be always empty + * + * @throws Exception + */ + @Test + public void testBuildSchemaNonEmptyLRBatch() throws Exception { + + // Get the left container with dummy data + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(nonEmptyRightRowSet.container()); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + fail(); + } catch (AssertionError | Exception error) { + // Expected since first right batch is supposed to be empty + assertTrue(error instanceof IllegalStateException); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * With non-empty left and empty right batch, the {@link LateralJoinBatch#buildSchema()} phase + * should still build the output container schema and return empty batch + * + * @throws Exception + */ + @Test + public void testBuildSchemaNonEmptyLEmptyRBatch() throws Exception { + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == 0); + + // Since Right batch is empty it should drain left and return NONE + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * This case should never happen since With empty left there cannot be non-empty right batch, the + * {@link LateralJoinBatch#buildSchema()} phase should fail() with {@link IllegalStateException} + * + * @throws Exception + */ + @Test + public void testBuildSchemaEmptyLNonEmptyRBatch() throws Exception { + + // Get the left container with dummy data for Lateral Join + leftContainer.add(emptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(nonEmptyRightRowSet.container()); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + fail(); + + } catch (AssertionError | Exception error) { + // Expected since first right batch is supposed to be empty + assertTrue(error instanceof IllegalStateException); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test for receiving unexpected EMIT outcome during build phase on left side. + * @throws Exception + */ + @Test + public void testBuildSchemaWithEMITOutcome() throws Exception { + // Get the left container with dummy data for Lateral Join + leftContainer.add(emptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(nonEmptyRightRowSet.container()); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + ljBatch.next(); + fail(); + } catch (AssertionError | Exception error) { + // Expected since first right batch is supposed to be empty + assertTrue(error instanceof IllegalStateException); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test to show correct IterOutcome produced by LATERAL when one record in left batch produces only 1 right batch + * with EMIT outcome. Then output of LATERAL should be produced by OK outcome after the join. It verifies the number + * of records in the output batch based on left and right input batches. + * + * @throws Exception + */ + @Test + public void test1RecordLeftBatchTo1RightRecordBatch() throws Exception { + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test to show correct IterOutcome & output produced by LATERAL when one record in left batch produces 2 right + * batches with OK and EMIT outcome respectively. Then output of LATERAL should be produced with OK outcome after the + * join is done using both right batch with the left batch. It verifies the number of records in the output batch + * based on left and right input batches. + * + * @throws Exception + */ + @Test + public void test1RecordLeftBatchTo2RightRecordBatch() throws Exception { + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.OK); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == + (nonEmptyLeftRowSet.rowCount() * (nonEmptyRightRowSet.rowCount() + nonEmptyRightRowSet2.rowCount()))); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * Test to show that if the result of cross join between left batch and empty right batch is an empty + * batch and next batch on left side result in NONE outcome, then there is no output produced and result + * returned to downstream is NONE. + * + * @throws Exception + */ + @Test + public void test1RecordLeftBatchToEmptyRightBatch() throws Exception { + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(emptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test to show LATERAL tries to pack the output batch until it's full or all the data is consumed from left and + * right side. We have multiple left and right batches which fits after join inside the same output batch, hence + * LATERAL only generates one output batch. + * + * @throws Exception + */ + @Test + public void testFillingUpOutputBatch() throws Exception { + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * When multiple left batches are received with different schema, then LATERAL produces output for each schema type + * separately (even though output batch is not filled completely) and handles the schema change in left batch. + * Moreover in this case the schema change was only for columns which are not produced by the UNNEST or right branch. + * + * @throws Exception + */ + @Test + public void testHandlingSchemaChangeForNonUnnestField() throws Exception { + + // Create left input schema 2 + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.VARCHAR) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2) + .addRow(2, "20", "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + // This means 2 output record batches were received because of Schema change + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * When multiple left and right batches are received with different schema, then LATERAL produces output for each + * schema type separately (even though output batch is not filled completely) and handles the schema change correctly. + * Moreover in this case the schema change was for both left and right branch (produced by UNNEST with empty batch). + * + * @throws Exception + */ + @Test + public void testHandlingSchemaChangeForUnnestField() throws Exception { + // Create left input schema 2 + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.VARCHAR) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + // Create right input schema + TupleMetadata rightSchema2 = new SchemaBuilder() + .add("id_right", TypeProtos.MinorType.INT) + .add("cost_right", TypeProtos.MinorType.VARCHAR) + .add("name_right", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2) + .addRow(2, "20", "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2) + .build(); + + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2) + .addRow(4, "41", "item41") + .addRow(5, "51", "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + // first OK_NEW_SCHEMA batch + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + // second OK_NEW_SCHEMA batch. Right side batch for OK_New_Schema is always empty + rightContainer.add(emptyRightRowSet2.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + // This means 2 output record batches were received because of Schema change + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + emptyRightRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * Verify if there is no schema change on left side and LATERAL still sees an unexpected schema change on right side + * then it handles it correctly. + * handle it corr + * @throws Exception + */ + @Test + public void testHandlingUnexpectedSchemaChangeForUnnestField() throws Exception { + // Create left input schema 2 + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.VARCHAR) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + // Create right input schema + TupleMetadata rightSchema2 = new SchemaBuilder() + .add("id_right", TypeProtos.MinorType.INT) + .add("cost_right", TypeProtos.MinorType.VARCHAR) + .add("name_right", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2) + .addRow(2, "20", "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2) + .build(); + + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2) + .addRow(4, "41", "item41") + .addRow(5, "51", "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + // first OK_NEW_SCHEMA batch + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + // second OK_NEW_SCHEMA batch. Right side batch for OK_New_Schema is always empty + rightContainer.add(emptyRightRowSet2.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.OK); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + ljBatch.next(); + fail(); + } catch (AssertionError | Exception error) { + // Expected since first right batch is supposed to be empty + assertTrue(error instanceof IllegalStateException); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + emptyRightRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that + * correctly and suppresses schema change operation by producing output in same batch created with initial schema. + * The schema change was only for columns which are not produced by the UNNEST or right branch. + * + * @throws Exception + */ + @Test + public void testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForNonUnnestField() throws Exception { + + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + // This means only 1 output record batch was received. + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that + * correctly and suppresses false schema change indication from both left and right branch. It produces output in + * same batch created with initial schema. + * The schema change is for columns common on both left and right side. + * + * @throws Exception + */ + @Test + public void testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForUnnestField() throws Exception { + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + // This means only 1 output record batch was received. + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + + /** + * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that + * correctly and suppresses schema change operation by producing output in same batch created with initial schema. + * The schema change was only for columns which are not produced by the UNNEST or right branch. + * + * @throws Exception + */ + @Test + public void testHandlingEMITFromLeft() throws Exception { + + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(3, 30, "item30") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(emptyLeftRowSet.container()); + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + + // 1st output batch is received for first EMIT from LEFT side + assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + + // 2nd output batch is received for second EMIT from LEFT side + assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + + // Compare the total records generated in 2 output batches with expected count. + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * Test for the case where LATERAL received a left batch with OK outcome and then populate the Join output in the + * outgoing batch. There is still some space left in output batch so LATERAL call's next() on left side and receive + * NONE outcome from left side. Then in this case LATERAL should produce the previous output batch with OK outcome + * and then handle NONE outcome in future next() call. + * + * @throws Exception + */ + @Test + public void testHandlingNoneAfterOK() throws Exception { + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + + // 1st output batch is received for first EMIT from LEFT side + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + + // Compare the total records generated in 2 output batches with expected count. + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test for the case when LATERAL received a left batch with OK outcome and then populate the Join output in the + * outgoing batch. There is still some space left in output batch so LATERAL call's next() on left side and receive + * EMIT outcome from left side with empty batch. Then in this case LATERAL should produce the previous output batch + * with EMIT outcome. + * + * @throws Exception + */ + @Test + public void testHandlingEmptyEMITAfterOK() throws Exception { + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(emptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + + // 1st output batch is received for first EMIT from LEFT side + assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + + // Compare the total records generated in 2 output batches with expected count. + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test for the case when LATERAL received a left batch with OK outcome and then populate the Join output in the + * outgoing batch. There is still some space left in output batch so LATERAL call's next() on left side and receive + * EMIT outcome from left side with non-empty batch. Then in this case LATERAL should produce the output batch with + * EMIT outcome if second left and right batches are also consumed entirely in same output batch. + * + * @throws Exception + */ + @Test + public void testHandlingNonEmptyEMITAfterOK() throws Exception { + + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + + // 1st output batch is received for first EMIT from LEFT side + assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + + // Compare the total records generated in 2 output batches with expected count. + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()) + + (leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error){ + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * Temporary test to validate LATERAL handling output batch getting filled without consuming full output from left + * and right batch join. + * <p> + * For this test we are updating {@link LateralJoinBatch#MAX_BATCH_SIZE} by making it public, which might not expected + * after including the BatchSizing logic + * TODO: Update the test after incorporating the BatchSizing change. + * + * @throws Exception + */ + @Test + public void testHandlingNonEmpty_EMITAfterOK_WithMultipleOutput() throws Exception { + + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE; + LateralJoinBatch.MAX_BATCH_SIZE = 2; + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + + // 1st output batch + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + assertTrue(ljBatch.getRecordCount() == LateralJoinBatch.MAX_BATCH_SIZE); + + // 2nd output batch + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == LateralJoinBatch.MAX_BATCH_SIZE); + totalRecordCount += ljBatch.getRecordCount(); + + // 3rd output batch + assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + + // Compare the total records generated in 2 output batches with expected count. + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()) + + (leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize; + } + } + + @Test + public void testHandlingOOMFromLeft() throws Exception { + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OUT_OF_MEMORY); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + + // 1st output batch + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + + // 2nd output batch + assertTrue(RecordBatch.IterOutcome.OUT_OF_MEMORY == ljBatch.next()); + + // Compare the total records generated in 2 output batches with expected count. + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount())); + + // TODO: We are not draining left or right batch anymore on receiving terminal outcome from either branch + // TODO: since not sure if that's the right behavior + //assertTrue(((MockRecordBatch) leftMockBatch).isCompleted()); + //assertTrue(((MockRecordBatch) rightMockBatch).isCompleted()); + + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + @Test + public void testHandlingOOMFromRight() throws Exception { + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.OUT_OF_MEMORY); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + // int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + + // 2nd output batch + assertTrue(RecordBatch.IterOutcome.OUT_OF_MEMORY == ljBatch.next()); + + // TODO: We are not draining left or right batch anymore on receiving terminal outcome from either branch + // TODO: since not sure if that's the right behavior + //assertTrue(((MockRecordBatch) leftMockBatch).isCompleted()); + //assertTrue(((MockRecordBatch) rightMockBatch).isCompleted()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test to check basic left lateral join is working correctly or not. We create a left batch with one and + * corresponding right batch with zero rows and check if output still get's populated with left side of data or not. + * Expectation is since it's a left join and even though right batch is empty the left row will be pushed to output + * batch. + * + * @throws Exception + */ + @Test + public void testBasicLeftLateralJoin() throws Exception { + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(emptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.container().getRecordCount()); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test to see if there are multiple rows in left batch and for some rows right side produces batch with records + * and for other rows right side produces empty batches then based on left join type we are populating the output + * batch correctly. Expectation is that for left rows if we find corresponding right rows then we will output both + * using cross-join but for left rows for which there is empty right side we will produce only left row in output + * batch. In this case all the output will be produces only in 1 record batch. + * + * @throws Exception + */ + @Test + public void testLeftLateralJoin_WithAndWithoutMatching() throws Exception { + // Get the left container with dummy data for Lateral Join + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(1, 10, "item10") + .addRow(2, 20, "item20") + .addRow(3, 30, "item30") + .build(); + + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(6, 60, "item61") + .addRow(7, 70, "item71") + .addRow(8, 80, "item81") + .build(); + + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + final int expectedOutputRecordCount = 7; // 3 for first left row and 1 for second left row + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == expectedOutputRecordCount); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** + * Test to see if there are multiple rows in left batch and for some rows right side produces batch with records + * and for other rows right side produces empty batches then based on left join type we are populating the output + * batch correctly. Expectation is that for left rows if we find corresponding right rows then we will output both + * using cross-join but for left rows for which there is empty right side we will produce only left row in output + * batch. But in this test we have made the Batch size very small so that output will be returned in multiple + * output batches. This test verifies even in this case indexes are manipulated correctly and outputs are produced + * correctly. + * TODO: Update the test case based on Batch Sizing project since then the variable might not be available. + * + * @throws Exception + */ + @Test + public void testLeftLateralJoin_WithAndWithoutMatching_MultipleBatch() throws Exception { + // Get the left container with dummy data for Lateral Join + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(1, 10, "item10") + .addRow(2, 20, "item20") + .addRow(3, 30, "item30") + .build(); + + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(6, 60, "item61") + .addRow(7, 70, "item71") + .addRow(8, 80, "item81") + .build(); + + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE; + LateralJoinBatch.MAX_BATCH_SIZE = 2; + + try { + final int expectedOutputRecordCount = 7; // 3 for first left row and 1 for second left row + int actualOutputRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + actualOutputRecordCount += ljBatch.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + actualOutputRecordCount += ljBatch.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + actualOutputRecordCount += ljBatch.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + actualOutputRecordCount += ljBatch.getRecordCount(); + assertTrue(actualOutputRecordCount == expectedOutputRecordCount); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize; + } + } + + /** + * This test generates an operator tree for multiple UNNEST at same level by stacking 2 LATERAL and UNNEST pair on + * top of each other. Then we call next() on top level LATERAL to simulate the operator tree and compare the + * outcome and record count generated with expected values. + * @throws Exception + */ + @Test + public void testMultipleUnnestAtSameLevel() throws Exception { + + // ** Prepare first pair of left batch and right batch for Lateral_1 ** + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL); + + final LateralJoinBatch ljBatch_1 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_1, rightMockBatch_1); + + // ** Prepare second pair of left and right batch for Lateral_2 ** + + // Get the right container with dummy data for Lateral Join_2 + + // Create right input schema + TupleMetadata rightSchema2 = new SchemaBuilder() + .add("id_right_1", TypeProtos.MinorType.INT) + .add("cost_right_1", TypeProtos.MinorType.INT) + .add("name_right_1", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2).build(); + + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2) + .addRow(6, 60, "item61") + .addRow(7, 70, "item71") + .addRow(8, 80, "item81") + .build(); + + final List<VectorContainer> rightContainer2 = new ArrayList<>(5); + // Get the right container with dummy data + rightContainer2.add(emptyRightRowSet2.container()); + rightContainer2.add(nonEmptyRightRowSet2.container()); + rightContainer2.add(emptyRightRowSet2.container()); + rightContainer2.add(emptyRightRowSet2.container()); + + final List<RecordBatch.IterOutcome> rightOutcomes2 = new ArrayList<>(5); + rightOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes2.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes2.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes2.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer2, rightOutcomes2, rightContainer2.get(0).getSchema()); + + final LateralJoinBatch ljBatch_2 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + ljBatch_1, rightMockBatch_2); + + try { + final int expectedOutputRecordCount = 3; // 3 from the lower level lateral and then finally 3 for 1st row in + // second lateral and 0 for other 2 rows. + + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch_2.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch_2.next()); + int actualOutputRecordCount = ljBatch_2.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch_2.next()); + assertTrue(actualOutputRecordCount == expectedOutputRecordCount); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch_2.close(); + rightMockBatch_2.close(); + ljBatch_1.close(); + leftMockBatch_1.close(); + rightMockBatch_1.close(); + rightContainer2.clear(); + rightOutcomes2.clear(); + } + } + + /** + * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair + * (using MockRecord Batch) as left and right child of lower level LATERAL. Then we call next() on top level + * LATERAL to simulate the operator tree and compare the outcome and record count generated with expected values. + * @throws Exception + */ + @Test + public void testMultiLevelLateral() throws Exception { + + // ** Prepare first pair of left batch and right batch for Lateral_1 ** + final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item2") + .build(); + + leftContainer.add(emptyLeftRowSet.container()); + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(nonEmptyLeftRowSet_1.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema) + .addRow(5, 51, "item51") + .addRow(6, 61, "item61") + .addRow(7, 71, "item71") + .build(); + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet_1.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL); + + final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_1, rightMockBatch_1); + + // ** Prepare second pair of left and right batch for Lateral_2 ** + + // Create left input schema + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("id_left_1", TypeProtos.MinorType.INT) + .add("cost_left_1", TypeProtos.MinorType.INT) + .add("name_left_1", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet emptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema2).build(); + final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema2) + .addRow(6, 60, "item6") + .build(); + + // Get the left container with dummy data + final List<VectorContainer> leftContainer2 = new ArrayList<>(5); + leftContainer2.add(emptyLeftRowSet2.container()); + leftContainer2.add(nonEmptyLeftRowSet2.container()); + + // Get the left outcomes with dummy data + final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5); + leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes2.add(RecordBatch.IterOutcome.OK); + + final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema()); + + final LateralJoinBatch upperLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_2, lowerLateral); + + try { + final int expectedOutputRecordCount = 6; + + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLateral.next()); + assertTrue(RecordBatch.IterOutcome.OK == upperLateral.next()); + int actualOutputRecordCount = upperLateral.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.NONE == upperLateral.next()); + assertTrue(actualOutputRecordCount == expectedOutputRecordCount); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + upperLateral.close(); + leftMockBatch_2.close(); + lowerLateral.close(); + leftMockBatch_1.close(); + rightMockBatch_1.close(); + leftContainer2.clear(); + leftOutcomes2.clear(); + } + } + + /** + * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair + * (using MockRecord Batch) as left and right child of lower level LATERAL. Then we call next() on top level + * LATERAL to simulate the operator tree and compare the outcome and record count generated with expected values. + * This test also changes the MAX_BATCH_SIZE to simulate the output being produced in multiple batches. + * @throws Exception + */ + @Test + public void testMultiLevelLateral_MultipleOutput() throws Exception { + + // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 ** + final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item2") + .build(); + + leftContainer.add(emptyLeftRowSet.container()); + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(nonEmptyLeftRowSet_1.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema) + .addRow(5, 51, "item51") + .addRow(6, 61, "item61") + .addRow(7, 71, "item71") + .build(); + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet_1.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + int originalMaxBatchSize = LateralJoinBatch.MAX_BATCH_SIZE; + LateralJoinBatch.MAX_BATCH_SIZE = 2; + + final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL); + + final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_1, rightMockBatch_1); + + // ** Prepare second pair of left and right batch for upper LATERAL Lateral_2 ** + + // Create left input schema + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("id_left_1", TypeProtos.MinorType.INT) + .add("cost_left_1", TypeProtos.MinorType.INT) + .add("name_left_1", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet emptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema2).build(); + final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema2) + .addRow(6, 60, "item6") + .build(); + + // Get the left container with dummy data + final List<VectorContainer> leftContainer2 = new ArrayList<>(5); + leftContainer2.add(emptyLeftRowSet2.container()); + leftContainer2.add(nonEmptyLeftRowSet2.container()); + + // Get the left incoming batch outcomes + final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5); + leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes2.add(RecordBatch.IterOutcome.OK); + + final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema()); + + final LateralJoinBatch upperLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_2, lowerLateral); + + try { + final int expectedOutputRecordCount = 6; + + int actualOutputRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLateral.next()); + assertTrue(RecordBatch.IterOutcome.OK == upperLateral.next()); + actualOutputRecordCount += upperLateral.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK == upperLateral.next()); + actualOutputRecordCount += upperLateral.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK == upperLateral.next()); + actualOutputRecordCount += upperLateral.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.NONE == upperLateral.next()); + assertTrue(actualOutputRecordCount == expectedOutputRecordCount); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + upperLateral.close(); + leftMockBatch_2.close(); + lowerLateral.close(); + leftMockBatch_1.close(); + rightMockBatch_1.close(); + leftContainer2.clear(); + leftOutcomes2.clear(); + LateralJoinBatch.MAX_BATCH_SIZE = originalMaxBatchSize; + } + } + + /** + * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair + * (using MockRecord Batch) as left and right child of lower level LATERAL. In this setup the test try to simulate + * the SchemaChange happening at upper level LATERAL left incoming second batch, which also results into the + * SchemaChange of left UNNEST of lower level LATERAL. This test validates that the schema change is handled + * correctly by both upper and lower level LATERAL. + * + * @throws Exception + */ + @Test + public void testMultiLevelLateral_SchemaChange_LeftUnnest() throws Exception { + + // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 ** + leftContainer.add(emptyLeftRowSet.container()); + leftContainer.add(nonEmptyLeftRowSet.container()); + + // Create left input schema2 for schema change batch + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("new_id_left", TypeProtos.MinorType.INT) + .add("new_cost_left", TypeProtos.MinorType.INT) + .add("new_name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet emptyLeftRowSet_Schema2 = fixture.rowSetBuilder(leftSchema2).build(); + final RowSet.SingleRowSet nonEmptyLeftRowSet_Schema2 = fixture.rowSetBuilder(leftSchema2) + .addRow(1111, 10001, "NewRecord") + .build(); + + leftContainer.add(emptyLeftRowSet_Schema2.container()); + leftContainer.add(nonEmptyLeftRowSet_Schema2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema) + .addRow(5, 51, "item51") + .addRow(6, 61, "item61") + .addRow(7, 71, "item71") + .build(); + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet_1.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL); + + final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_1, rightMockBatch_1); + + // ** Prepare second pair of left and right batch for upper level Lateral_2 ** + + // Create left input schema for first batch + TupleMetadata leftSchema3 = new SchemaBuilder() + .add("id_left_new", TypeProtos.MinorType.INT) + .add("cost_left_new", TypeProtos.MinorType.INT) + .add("name_left_new", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build(); + final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3) + .addRow(6, 60, "item6") + .build(); + + // Get left input schema for second left batch + TupleMetadata leftSchema4 = new SchemaBuilder() + .add("id_left_new_new", TypeProtos.MinorType.INT) + .add("cost_left_new_new", TypeProtos.MinorType.VARCHAR) + .add("name_left_new_new", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4) + .addRow(100, "100", "item100") + .build(); + + // Build Left container for upper level LATERAL operator + final List<VectorContainer> leftContainer2 = new ArrayList<>(5); + + // Get the left container with dummy data + leftContainer2.add(emptyLeftRowSet_leftSchema3.container()); + leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container()); + leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container()); + + // Get the left container outcomes for upper level LATERAL operator + final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5); + leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes2.add(RecordBatch.IterOutcome.OK); + leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema()); + + final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_2, lowerLevelLateral); + + try { + // 3 for first batch on left side and another 3 for next left batch + final int expectedOutputRecordCount = 6; + int actualOutputRecordCount = 0; + + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next()); + assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next()); + actualOutputRecordCount += upperLevelLateral.g
<TRUNCATED>