DRILL-6323: Lateral Join - Refactor, fix various issues with LATERAL: a)Override prefetch call in BuildSchema phase for LATERAL, b)EMIT handling in buildSchema, c)Issue when in multilevel Lateral case schema change is observed only on right side of UNNEST, d)Handle SelectionVector in incoming batch, e) Handling Schema change, f) Updating joinIndexes correctly when producing multiple output batches for current left&right inputs. Added tests for a)EMIT handling in buildSchema, b)Multiple UNNEST at same level, c)Multilevel Lateral, d)Multilevel Lateral with Schema change on left/right or both branches, e) Left LATERAL join f)Schema change for UNNEST and Non-UNNEST columns, g)Error outcomes from left&right, h) Producing multiple output batches for given incoming, i) Consuming multiple incoming into single output batch
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/74565ccb Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/74565ccb Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/74565ccb Branch: refs/heads/master Commit: 74565ccb71b6c7c978d53b3a98fb1ef8b7d8551b Parents: 5a63e27 Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> Authored: Tue Feb 20 14:47:48 2018 -0800 Committer: Parth Chandra <par...@apache.org> Committed: Tue Apr 17 18:15:44 2018 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/join/LateralJoin.java | 4 +- .../physical/impl/join/LateralJoinBatch.java | 240 +- .../physical/impl/join/LateralJoinTemplate.java | 5 +- .../exec/record/AbstractBinaryRecordBatch.java | 2 - .../impl/join/TestLateralJoinCorrectness.java | 2492 ++++++++++++++++++ 5 files changed, 2663 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/74565ccb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java index 1d946ce..723c0ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java @@ -41,7 +41,9 @@ public interface LateralJoin { // Produce output records taking into account join type public int crossJoinAndOutputRecords(int leftIndex, int rightIndex); - public void generateLeftJoinOutput(int leftIndex); + public int generateLeftJoinOutput(int leftIndex); + + public void updateOutputIndex(int newOutputIndex); // Project the record at offset 'leftIndex' in the left input batch into the output container at offset 'outIndex' public void emitLeft(int leftIndex, int outIndex); http://git-wip-us.apache.org/repos/asf/drill/blob/74565ccb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 45b5059..122ff86 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -52,20 +52,21 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; -/* - * RecordBatch implementation for the lateral join operator - * TODO: Create another class called BatchState for both left and right batches and store - * TODO: Schema, index and other flags in it. +/** + * RecordBatch implementation for the lateral join operator. Currently it's expected LATERAL to co-exists with UNNEST + * operator. Both LATERAL and UNNEST will share a contract with each other defined at {@link LateralContract} */ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class); // Maximum number records in the outgoing batch - protected static final int MAX_BATCH_SIZE = 4096; + // Made public for testing + static int MAX_BATCH_SIZE = 4096; // Input indexes to correctly update the stats - protected static final int LEFT_INPUT = 0; - protected static final int RIGHT_INPUT = 1; + private static final int LEFT_INPUT = 0; + + private static final int RIGHT_INPUT = 1; // Schema on the left side private BatchSchema leftSchema = null; @@ -138,7 +139,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } catch (SchemaChangeException ex) { logger.error("Failed to handle schema change hence killing the query"); context.getExecutorState().fail(ex); - kill(false); + left.kill(true); // Can have exchange receivers on left so called with true + right.kill(false); // Won't have exchange receivers on right side return false; } finally { stats.stopSetup(); @@ -155,45 +157,45 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> * when we populate the outgoing container then this method is called to get next left batch if current one is * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome. - * @param leftBatch - reference to left incoming record batch. Not needed but provided to make it easy for testing. * @return IterOutcome after processing current left batch */ - private IterOutcome processLeftBatch(RecordBatch leftBatch) { + private IterOutcome processLeftBatch() { boolean needLeftBatch = leftJoinIndex == -1; // If left batch is empty while (needLeftBatch) { - leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, leftBatch) : leftUpstream; - final boolean emptyLeftBatch = leftBatch.getRecordCount() <=0; + leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream; + final boolean emptyLeftBatch = left.getRecordCount() <=0; switch (leftUpstream) { case OK_NEW_SCHEMA: - // This means there is already some records from previous join inside left batch - // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call - if (outputRecords > 0) { - processLeftBatchInFuture = true; - return OK_NEW_SCHEMA; - } - // This OK_NEW_SCHEMA is received post build schema phase and from left side + // If schema didn't actually changed then just handle it as OK outcome if (!isSchemaChanged(left.getSchema(), leftSchema)) { logger.warn("New schema received from left side is same as previous known left schema. Ignoring this " + "schema change"); // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose // OK_NEW_SCHEMA outcome + processLeftBatchInFuture = false; if (emptyLeftBatch) { - processLeftBatchInFuture = false; continue; + } else { + leftUpstream = OK; } + } else if (outputRecords > 0) { + // This means there is already some records from previous join inside left batch + // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call + processLeftBatchInFuture = true; + return OK_NEW_SCHEMA; } // If left batch is empty with actual schema change then just rebuild the output container and send empty // batch downstream if (emptyLeftBatch) { if (handleSchemaChange()) { - container.setRecordCount(0); + //container.setRecordCount(0); leftJoinIndex = -1; return OK_NEW_SCHEMA; } else { @@ -213,7 +215,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // don't call next on right batch if (emptyLeftBatch) { leftJoinIndex = -1; - container.setRecordCount(0); + //container.setRecordCount(0); return EMIT; } else { leftJoinIndex = 0; @@ -226,10 +228,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> if (outputRecords > 0) { processLeftBatchInFuture = true; } - //TODO we got a STOP, shouldn't we stop immediately ? - // TODO: check what killAndDrain will do w.r.t UNNEST, we discussed about calling right side - // of LATERAL with NONE outcome or calling stop explicitly when NONE is seen on left side - killAndDrainIncoming(right, rightUpstream, RIGHT_INPUT); return leftUpstream; case NOT_YET: try { @@ -240,16 +238,21 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } break; } - needLeftBatch = leftJoinIndex == -1; } - return leftUpstream; } - private IterOutcome processRightBatch(RecordBatch right) { + /** + * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is + * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator and if + * left batch has some records in it. Also when we populate the outgoing container then this method is called to + * get next right batch if current one is fully processed. + * @return IterOutcome after processing current left batch + */ + private IterOutcome processRightBatch() { // Check if we still have records left to process in left incoming from new batch or previously half processed - // batch. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new + // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then // it will be set to -1. // Whereas rightJoinIndex is to keep track of record in right batch being joined with @@ -290,10 +293,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> case OUT_OF_MEMORY: case NONE: case STOP: - //TODO we got a STOP, shouldn't we stop immediately ? - // TODO: Should we kill left side if right side fails ? - killAndDrainIncoming(left, leftUpstream, LEFT_INPUT); - VectorAccessibleUtilities.clear(container); needNewRightBatch = false; break; case NOT_YET: @@ -306,7 +305,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> break; } } - return rightUpstream; } @@ -321,7 +319,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> public IterOutcome innerNext() { // We don't do anything special on FIRST state. Process left batch first and then right batch if need be - IterOutcome childOutcome = processLeftBatch(left); + IterOutcome childOutcome = processLeftBatch(); // reset this state after calling processLeftBatch above. processLeftBatchInFuture = false; @@ -330,16 +328,17 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // left side is terminal state then just return the IterOutcome and don't call next() on // right branch if (left.getRecordCount() == 0 || isTerminalOutcome(childOutcome)) { + container.setRecordCount(0); return childOutcome; } // Left side has some records in the batch so let's process right batch - childOutcome = processRightBatch(right); + childOutcome = processRightBatch(); // reset the left & right outcomes to OK here and send the empty batch downstream // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do if (childOutcome == OK_NEW_SCHEMA) { - leftUpstream = OK; + leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream; rightUpstream = OK; return childOutcome; } @@ -363,12 +362,22 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // allocate space for the outgoing batch allocateVectors(); - return produceOutputBatch(); } + /** + * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the + * records in the either or both the batches are consumed then it get's next batch from that branch depending upon + * if output batch still has some space left. If output batch is full then the output if finalized to be sent + * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the + * output using that. + * + * @return - IterOutcome to be send along with output batch to downstream operator + */ private IterOutcome produceOutputBatch() { + boolean isLeftProcessed = false; + // Try to fully pack the outgoing container while (outputRecords < LateralJoinBatch.MAX_BATCH_SIZE) { int previousOutputCount = outputRecords; @@ -384,8 +393,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> rightJoinIndex = -1; } else { // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the - // output records for this record batch until it's fully consumed - rightJoinIndex += outputRecords; + // output records for this record batch until it's fully consumed. + // + // Also it can be so that one output batch can contain records from 2 different right batch hence the + // rightJoinIndex should move by number of records in output batch for current right batch only. + rightJoinIndex += outputRecords - previousOutputCount; } final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount(); @@ -398,7 +410,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> if (rightUpstream == EMIT) { if (!matchedRecordFound) { // will only produce left side in case of LEFT join - lateralJoiner.generateLeftJoinOutput(leftJoinIndex); + outputRecords = lateralJoiner.generateLeftJoinOutput(leftJoinIndex); } ++leftJoinIndex; // Reset matchedRecord for next left index record @@ -411,7 +423,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } // Check if previous left record was last one, then set leftJoinIndex to -1 - final boolean isLeftProcessed = leftJoinIndex >= left.getRecordCount(); + isLeftProcessed = leftJoinIndex >= left.getRecordCount(); if (isLeftProcessed) { leftJoinIndex = -1; VectorAccessibleUtilities.clear(left); @@ -426,13 +438,25 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> break; } else { // Get both left batch and the right batch and make sure indexes are properly set - leftUpstream = processLeftBatch(left); + leftUpstream = processLeftBatch(); if (processLeftBatchInFuture) { // We should return the current output batch with OK outcome and don't reset the leftUpstream finalizeOutputContainer(); return OK; } + + // If left batch received a terminal outcome then don't call right batch + if (isTerminalOutcome(leftUpstream)) { + finalizeOutputContainer(); + return leftUpstream; + } + + // If we have received the left batch with EMIT outcome and is empty then we should return previous output + // batch with EMIT outcome + if (leftUpstream == EMIT && left.getRecordCount() == 0) { + break; + } } } @@ -444,11 +468,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome - rightUpstream = processRightBatch(right); + rightUpstream = processRightBatch(); Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch"); if (isTerminalOutcome(rightUpstream)) { + finalizeOutputContainer(); return rightUpstream; } } @@ -456,7 +481,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> finalizeOutputContainer(); - if (leftUpstream == EMIT) { + // Check if output batch was full and left was fully consumed or not. Since if left is not consumed entirely + // but output batch is full, then if the left batch came with EMIT outcome we should send this output batch along + // with OK outcome not with EMIT. Whereas if output is full and left is also fully consumed then we should send + // EMIT outcome. + if (leftUpstream == EMIT && isLeftProcessed) { return EMIT; } @@ -465,7 +494,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> leftUpstream = OK; return OK_NEW_SCHEMA; } - return OK; } @@ -484,34 +512,44 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // We are about to send the output batch so reset the outputRecords for future next call outputRecords = 0; + // Update the output index for next output batch to zero + lateralJoiner.updateOutputIndex(0); } - private void killAndDrainIncoming(RecordBatch batch, IterOutcome outcome, - int batchIndex) { - if (!hasMore(outcome)) { - return; - } - - batch.kill(true); - while (hasMore(outcome)) { - for (final VectorWrapper<?> wrapper : batch) { - wrapper.getValueVector().clear(); - } - outcome = next(batchIndex, batch); - } - if (batchIndex == RIGHT_INPUT) { - rightUpstream = outcome; - } else { - leftUpstream = outcome; - } + /** + * Check if the schema changed between provided newSchema and oldSchema. It relies on + * {@link BatchSchema#isEquivalent(BatchSchema)}. + * @param newSchema - New Schema information + * @param oldSchema - - New Schema information to compare with + * + * @return - true - if newSchema is not same as oldSchema + * - false - if newSchema is same as oldSchema + */ + private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) { + return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema); } - private boolean hasMore(IterOutcome outcome) { - return outcome == OK || outcome == OK_NEW_SCHEMA || outcome == EMIT; - } + /** + * Validate if the input schema is not null and doesn't contain any Selection Vector. + * @param schema - input schema to verify + * @return - true: valid input schema + * false: invalid input schema + */ + private boolean verifyInputSchema(BatchSchema schema) { - private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) { - return newSchema.isEquivalent(oldSchema); + boolean isValid = true; + if (schema == null) { + logger.error("Null schema found for the incoming batch"); + isValid = false; + } else { + final BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode(); + if (svMode != BatchSchema.SelectionVectorMode.NONE) { + logger.error("Incoming batch schema found with selection vector which is not supported. SVMode: {}", + svMode.toString()); + isValid = false; + } + } + return isValid; } /** @@ -525,9 +563,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> leftSchema = left.getSchema(); rightSchema = right.getSchema(); - if (leftSchema == null || rightSchema == null) { - throw new SchemaChangeException("Either of left or right schema was not set properly in the batches. Hence " + - "failed to setupNewSchema"); + if (!verifyInputSchema(leftSchema)) { + throw new SchemaChangeException("Invalid Schema found for left incoming batch"); + } + + if (!verifyInputSchema(rightSchema)) { + throw new SchemaChangeException("Invalid Schema found for right incoming batch"); } // Setup LeftSchema in outgoing container @@ -652,6 +693,52 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } } + private boolean setBatchState(IterOutcome outcome) { + switch(outcome) { + case STOP: + case EMIT: + state = BatchState.STOP; + return false; + case OUT_OF_MEMORY: + state = BatchState.OUT_OF_MEMORY; + return false; + case NONE: + case NOT_YET: + state = BatchState.DONE; + return false; + } + return true; + } + + /** + * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a + * failure outcome then we don't even call next on right branch, since there is no left incoming. + * @return true if both the left/right batch was received without failure outcome. + * false if either of batch is received with failure outcome. + * + * @throws SchemaChangeException + */ + @Override + protected boolean prefetchFirstBatchFromBothSides() { + // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch + leftUpstream = next(0, left); + + boolean validBatch = setBatchState(leftUpstream); + + if (validBatch) { + rightUpstream = next(1, right); + validBatch = setBatchState(rightUpstream); + } + + // EMIT outcome is not expected as part of first batch from either side + if (leftUpstream == EMIT || rightUpstream == EMIT) { + state = BatchState.STOP; + throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " + + "buildSchema phase"); + } + return validBatch; + } + /** * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side @@ -665,7 +752,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> if (!prefetchFirstBatchFromBothSides()) { return; } - Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received"); // Setup output container schema based on known left and right schema @@ -674,7 +760,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Release the vectors received from right side VectorAccessibleUtilities.clear(right); - // We should not allocate memory for all the value vectors inside output batch // since this is buildSchema phase and we are sending empty batches downstream lateralJoiner = setupWorker(); @@ -697,6 +782,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> @Override protected void killIncoming(boolean sendUpstream) { this.left.kill(sendUpstream); + // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming + // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream + // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to + // UNNEST. + leftUpstream = STOP; this.right.kill(sendUpstream); } http://git-wip-us.apache.org/repos/asf/drill/blob/74565ccb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java index 6e756a4..d7b4f1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java @@ -95,7 +95,7 @@ public abstract class LateralJoinTemplate implements LateralJoin { * newOutputIndex * @param newOutputIndex - new output index of outgoing batch after copying the records */ - private void updateOutputIndex(int newOutputIndex) { + public void updateOutputIndex(int newOutputIndex) { outputIndex = (newOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) ? 0 : newOutputIndex; } @@ -105,13 +105,14 @@ public abstract class LateralJoinTemplate implements LateralJoin { * used in case when Join Type is LEFT and we have only seen empty batches from right side * @param leftIndex - index in left batch to copy record from */ - public void generateLeftJoinOutput(int leftIndex) { + public int generateLeftJoinOutput(int leftIndex) { int currentOutputIndex = outputIndex; if (JoinRelType.LEFT == joinType) { emitLeft(leftIndex, currentOutputIndex++); updateOutputIndex(currentOutputIndex); } + return currentOutputIndex; } /** http://git-wip-us.apache.org/repos/asf/drill/blob/74565ccb/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java index b671915..02b07bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java @@ -54,8 +54,6 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch leftUpstream = next(0, left); - // Right will always get zero records with OK_NEW_SCHEMA outcome as first batch, since right - // now Lateral will always be tied up with UNNEST rightUpstream = next(1, right); if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {