This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit f3246a4329bcd7f2007f3cb67c5666ef6d6a55f0 Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> AuthorDate: Thu Jul 26 14:56:54 2018 -0700 DRILL-6635: Update UnnesRecordBatch to handle kill differently with respect to PartitionLimitBatch in the subquery Fix in MockLateralJoinBatch for unnest kill tests --- .../physical/impl/unnest/UnnestRecordBatch.java | 41 +++++++--------------- .../physical/impl/unnest/MockLateralJoinBatch.java | 4 ++- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index 2e2f405..e89144d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -45,7 +45,6 @@ import org.apache.drill.exec.vector.complex.RepeatedValueVector; import java.util.List; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; // TODO - handle the case where a user tries to unnest a scalar, should just return the column as is @@ -63,12 +62,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO // to keep processing it. Kill may be called by a limit in a subquery that // requires us to stop processing thecurrent row, but not stop processing // the data. - // In some cases we need to return a predetermined state from a call to next. These are: - // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE - // 2) Kill is called by LIMIT to stop processing of the current row (This occurs when the LIMIT is part of a subquery - // between UNNEST and LATERAL. Iteroutcome should be EMIT - // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be NONE - private IterOutcome nextState = OK; private int remainderIndex = 0; private int recordCount; private MaterializedField unnestFieldMetadata; @@ -159,24 +152,21 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO } protected void killIncoming(boolean sendUpstream) { - // Kill may be received from an operator downstream of the corresponding lateral, or from - // a limit that is in a subqueruy between unnest and lateral. In the latter case, unnest has to handle the limit. - // In the former case, Lateral will handle most of the kill handling. - + // + // In some cases we need to return a predetermined state from a call to next. These are: + // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE + // 2) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be NONE + // With PartitionLimitBatch occurring between Lateral and Unnest subquery, kill won't be triggered by it hence no + // special handling is needed in that case. + // Preconditions.checkNotNull(lateral); // Do not call kill on incoming. Lateral Join has the responsibility for killing incoming - if (context.getExecutorState().isFailed() || lateral.getLeftOutcome() == IterOutcome.STOP) { - logger.debug("Kill received. Stopping all processing"); - nextState = IterOutcome.NONE ; - } else { - // if we have already processed the record, then kill from a limit has no meaning. - // if, however, we have values remaining to be emitted, and limit has been reached, - // we abandon the remainder and send an empty batch with EMIT. - logger.debug("Kill received from subquery. Stopping processing of current input row."); - if(hasRemainder) { - nextState = IterOutcome.EMIT; - } - } + Preconditions.checkState(context.getExecutorState().isFailed() || + lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest with unexpected state. " + + "Neither the LateralOutcome is STOP nor executor state is failed"); + logger.debug("Kill received. Stopping all processing"); + state = BatchState.DONE; + recordCount = 0; hasRemainder = false; // whatever the case, we need to stop processing the current row. } @@ -190,11 +180,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO return IterOutcome.NONE; } - if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) { - recordCount = 0; - return nextState; - } - if (hasNewSchema) { memoryManager.update(); hasNewSchema = false; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java index 3f52351..c7105f9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java @@ -131,7 +131,9 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat // Pretend that an operator somewhere between lateral and unnest // wants to terminate processing of the record. if(unnestLimit > 0 && unnestCount >= unnestLimit) { - unnest.kill(true); + // break here rather than sending kill to unnest since with partitionLimitBatch kill will never be + // sent to unnest from subquery + break; } } return currentOutcome;