This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f3246a4329bcd7f2007f3cb67c5666ef6d6a55f0
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
AuthorDate: Thu Jul 26 14:56:54 2018 -0700

    DRILL-6635: Update UnnesRecordBatch to handle kill differently with respect 
to PartitionLimitBatch in the subquery
    Fix in MockLateralJoinBatch for unnest kill tests
---
 .../physical/impl/unnest/UnnestRecordBatch.java    | 41 +++++++---------------
 .../physical/impl/unnest/MockLateralJoinBatch.java |  4 ++-
 2 files changed, 16 insertions(+), 29 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 2e2f405..e89144d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -45,7 +45,6 @@ import 
org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
 import java.util.List;
 
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 
 // TODO - handle the case where a user tries to unnest a scalar, should just 
return the column as is
@@ -63,12 +62,6 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
                                         // to keep processing it. Kill may be 
called by a limit in a subquery that
                                         // requires us to stop processing 
thecurrent row, but not stop processing
                                         // the data.
-  // In some cases we need to return a predetermined state from a call to 
next. These are:
-  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
-  // 2) Kill is called by LIMIT to stop processing of the current row (This 
occurs when the LIMIT is part of a subquery
-  //    between UNNEST and LATERAL. Iteroutcome should be EMIT
-  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be 
NONE
-  private IterOutcome nextState = OK;
   private int remainderIndex = 0;
   private int recordCount;
   private MaterializedField unnestFieldMetadata;
@@ -159,24 +152,21 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
   }
 
   protected void killIncoming(boolean sendUpstream) {
-    // Kill may be received from an operator downstream of the corresponding 
lateral, or from
-    // a limit that is in a subqueruy between unnest and lateral. In the 
latter case, unnest has to handle the limit.
-    // In the former case, Lateral will handle most of the kill handling.
-
+    //
+    // In some cases we need to return a predetermined state from a call to 
next. These are:
+    // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+    // 2) Kill is called by LIMIT downstream from LATERAL. IterOutcome should 
be NONE
+    // With PartitionLimitBatch occurring between Lateral and Unnest subquery, 
kill won't be triggered by it hence no
+    // special handling is needed in that case.
+    //
     Preconditions.checkNotNull(lateral);
     // Do not call kill on incoming. Lateral Join has the responsibility for 
killing incoming
-    if (context.getExecutorState().isFailed() || lateral.getLeftOutcome() == 
IterOutcome.STOP) {
-      logger.debug("Kill received. Stopping all processing");
-      nextState = IterOutcome.NONE ;
-    } else {
-      // if we have already processed the record, then kill from a limit has 
no meaning.
-      // if, however, we have values remaining to be emitted, and limit has 
been reached,
-      // we abandon the remainder and send an empty batch with EMIT.
-      logger.debug("Kill received from subquery. Stopping processing of 
current input row.");
-      if(hasRemainder) {
-        nextState = IterOutcome.EMIT;
-      }
-    }
+    Preconditions.checkState(context.getExecutorState().isFailed() ||
+      lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest 
with unexpected state. " +
+      "Neither the LateralOutcome is STOP nor executor state is failed");
+    logger.debug("Kill received. Stopping all processing");
+    state = BatchState.DONE;
+    recordCount = 0;
     hasRemainder = false; // whatever the case, we need to stop processing the 
current row.
   }
 
@@ -190,11 +180,6 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
       return IterOutcome.NONE;
     }
 
-    if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) {
-      recordCount = 0;
-      return nextState;
-    }
-
     if (hasNewSchema) {
       memoryManager.update();
       hasNewSchema = false;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index 3f52351..c7105f9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -131,7 +131,9 @@ public class MockLateralJoinBatch implements 
LateralContract, CloseableRecordBat
           // Pretend that an operator somewhere between lateral and unnest
           // wants to terminate processing of the record.
           if(unnestLimit > 0 && unnestCount >= unnestLimit) {
-            unnest.kill(true);
+            // break here rather than sending kill to unnest since with 
partitionLimitBatch kill will never be
+            // sent to unnest from subquery
+            break;
           }
         }
         return currentOutcome;

Reply via email to