This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 221484bf8f5ce82d5b2b003d02f69553e6ecf516
Author: Paul Rogers <[email protected]>
AuthorDate: Wed Jan 1 21:52:56 2020 -0800

    DRILL-7507: Convert fragment interrupts to exceptions
    
    Modifies fragment interrupt handling to throw a specialized
    exception, rather than relying on the complex and cumbersome
    STOP iterator status.
    
    closes #1949
---
 .../org/apache/drill/exec/ops/FragmentContext.java |  29 +++-
 .../drill/exec/ops/QueryCancelledException.java    |  32 ++++
 .../drill/exec/physical/impl/BaseRootExec.java     |   9 +-
 .../impl/aggregate/SpilledRecordbatch.java         |  15 +-
 .../impl/mergereceiver/MergingRecordBatch.java     | 173 ++++++++++-----------
 .../OrderedPartitionRecordBatch.java               |  16 +-
 .../impl/partitionsender/PartitionerDecorator.java |   6 +-
 .../impl/producer/ProducerConsumerBatch.java       |  10 +-
 .../unorderedreceiver/UnorderedReceiverBatch.java  |   4 +-
 .../physical/impl/xsort/ExternalSortBatch.java     |   5 +-
 .../exec/physical/impl/xsort/MSortTemplate.java    |   4 +-
 .../drill/exec/record/AbstractRecordBatch.java     |  20 ++-
 .../drill/exec/work/fragment/FragmentExecutor.java |  10 ++
 .../drill/exec/physical/impl/SimpleRootExec.java   |   7 +
 .../unnest/TestUnnestWithLateralCorrectness.java   |   2 +-
 .../org/apache/drill/test/OperatorFixture.java     |  21 +--
 16 files changed, 207 insertions(+), 156 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 2107094..619f2d1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -213,15 +213,36 @@ public interface FragmentContext extends UdfUtilities, 
AutoCloseable {
 
   interface ExecutorState {
     /**
-     * Tells individual operations whether they should continue. In some 
cases, an external event (typically cancellation)
-     * will mean that the fragment should prematurely exit execution. Long 
running operations should check this every so
-     * often so that Drill is responsive to cancellation operations.
+     * Tells individual operations whether they should continue. In some cases,
+     * an external event (typically cancellation) will mean that the fragment
+     * should prematurely exit execution. Long running operations should check
+     * this every so often so that Drill is responsive to cancellation
+     * operations.
      *
-     * @return False if the action should terminate immediately, true if 
everything is okay.
+     * @return False if the action should terminate immediately, true if
+     *         everything is okay.
      */
     boolean shouldContinue();
 
     /**
+     * Check if an operation should continue. In some cases,
+     * an external event (typically cancellation) will mean that the fragment
+     * should prematurely exit execution. Long running operations should check
+     * this every so often so that Drill is responsive to cancellation
+     * operations.
+     * <p>
+     * Throws QueryCancelledException if the query (fragment) should stop.
+     * The fragment executor interprets this as an exception it, itself,
+     * requested, and will call the operator's close() method to release
+     * resources. Operators should not catch and handle this exception,
+     * and should only call this method when the operator holds no
+     * transient resources (such as local variables.)
+     *
+     * @throws QueryCancelledException if the query (fragment) should stop.
+     */
+    void checkContinue();
+
+    /**
      * Inform the executor if a exception occurs and fragment should be failed.
      *
      * @param t
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryCancelledException.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryCancelledException.java
new file mode 100644
index 0000000..0d5a5a5
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryCancelledException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+/**
+ * Indicates that an external source has cancelled the query.
+ * Thrown by the operator that detects the cancellation. Bubbles
+ * up the operator tree like other exceptions. However, this one
+ * tells the fragment executor that the exception is one that
+ * the fragment executor itself initiated and so should not
+ * be reported as an error.
+ */
+@SuppressWarnings("serial")
+public class QueryCancelledException extends RuntimeException {
+
+  // No need for messages; this exception is silently ignored.
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 04ea6b6..24597ae 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -32,9 +32,11 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class BaseRootExec implements RootExec {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseRootExec.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(BaseRootExec.class);
 
   public static final String ENABLE_BATCH_DUMP_CONFIG = 
"drill.exec.debug.dump_batches";
   protected OperatorStats stats;
@@ -85,11 +87,8 @@ public abstract class BaseRootExec implements RootExec {
 
   @Override
   public final boolean next() {
-    // Stats should have been initialized
     assert stats != null;
-    if (!fragmentContext.getExecutorState().shouldContinue()) {
-      return false;
-    }
+    fragmentContext.getExecutorState().checkContinue();
     try {
       stats.startProcessing();
       return innerNext();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index 822a810..56adae2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -47,12 +47,12 @@ public class SpilledRecordbatch implements 
CloseableRecordBatch {
   private VectorContainer container;
   private InputStream spillStream;
   private int spilledBatches;
-  private FragmentContext context;
-  private BatchSchema schema;
-  private SpillSet spillSet;
-  private String spillFile;
+  private final FragmentContext context;
+  private final BatchSchema schema;
+  private final SpillSet spillSet;
+  private final String spillFile;
   VectorAccessibleSerializable vas;
-  private IterOutcome initialOutcome;
+  private final IterOutcome initialOutcome;
   // Represents last outcome of next(). If an Exception is thrown
   // during the method's execution a value IterOutcome.STOP will be assigned.
   private IterOutcome lastOutcome;
@@ -134,10 +134,7 @@ public class SpilledRecordbatch implements 
CloseableRecordBatch {
   @Override
   public IterOutcome next() {
 
-    if (!context.getExecutorState().shouldContinue()) {
-      lastOutcome = IterOutcome.STOP;
-      return lastOutcome;
-    }
+    context.getExecutorState().checkContinue();
 
     if ( spilledBatches <= 0 ) { // no more batches to read in this partition
       this.close();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 420b61a..9443882 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.QueryCancelledException;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -151,7 +152,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       // interruption and respond to it if it wants to.
       Thread.currentThread().interrupt();
 
-      return null;
+      throw new QueryCancelledException();
     } finally {
       stats.stopWait();
     }
@@ -194,97 +195,100 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
 
       // set up each (non-empty) incoming record batch
       final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
-      int p = 0;
-      for (@SuppressWarnings("unused") final RawFragmentBatchProvider provider 
: fragProviders) {
-        RawFragmentBatch rawBatch;
-        // check if there is a batch in temp holder before calling getNext(), 
as it may have been used when building schema
-        if (tempBatchHolder[p] != null) {
-          rawBatch = tempBatchHolder[p];
-          tempBatchHolder[p] = null;
-        } else {
-          try {
-            rawBatch = getNext(p);
-          } catch (final IOException e) {
-            context.getExecutorState().fail(e);
-            return IterOutcome.STOP;
+      try {
+        int p = 0;
+        for (@SuppressWarnings("unused") final RawFragmentBatchProvider 
provider : fragProviders) {
+          RawFragmentBatch rawBatch;
+          // check if there is a batch in temp holder before calling 
getNext(), as it may have been used when building schema
+          if (tempBatchHolder[p] != null) {
+            rawBatch = tempBatchHolder[p];
+            tempBatchHolder[p] = null;
+          } else {
+            try {
+              rawBatch = getNext(p);
+            } catch (final IOException e) {
+              context.getExecutorState().fail(e);
+              return IterOutcome.STOP;
+            }
           }
-        }
-        if (rawBatch == null && !context.getExecutorState().shouldContinue()) {
-          clearBatches(rawBatches);
-          return IterOutcome.STOP;
-        }
+          checkContinue();
 
-        // If rawBatch is null, go ahead and add it to the list. We will 
create dummy batches
-        // for all null batches later.
-        if (rawBatch == null) {
-          createDummyBatch = true;
-          rawBatches.add(rawBatch);
-          p++; // move to next sender
-          continue;
-        }
+          // If rawBatch is null, go ahead and add it to the list. We will 
create dummy batches
+          // for all null batches later.
+          if (rawBatch == null) {
+            checkContinue();
+            createDummyBatch = true;
+            rawBatches.add(rawBatch);
+            p++; // move to next sender
+            continue;
+          }
 
-        if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() 
!= 0) {
-          // save the schema to fix up empty batches with no schema if needed.
-            fieldList = rawBatch.getHeader().getDef().getFieldList();
-        }
+          if (fieldList == null && 
rawBatch.getHeader().getDef().getFieldCount() != 0) {
+            // save the schema to fix up empty batches with no schema if 
needed.
+              fieldList = rawBatch.getHeader().getDef().getFieldList();
+          }
 
-        if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
-          rawBatches.add(rawBatch);
-        } else {
-          // keep reading till we get a batch with record count > 0 or we have 
no more batches to read i.e. we get null
-          try {
-            while ((rawBatch = getNext(p)) != null && 
rawBatch.getHeader().getDef().getRecordCount() == 0) {
-              // Do nothing
-            }
-            if (rawBatch == null && 
!context.getExecutorState().shouldContinue()) {
+          if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
+            rawBatches.add(rawBatch);
+          } else {
+            // keep reading till we get a batch with record count > 0 or we 
have no more batches to read i.e. we get null
+            try {
+              while ((rawBatch = getNext(p)) != null && 
rawBatch.getHeader().getDef().getRecordCount() == 0) {
+                // Do nothing
+              }
+              if (rawBatch == null) {
+                checkContinue();
+                createDummyBatch = true;
+              }
+            } catch (final IOException e) {
+              context.getExecutorState().fail(e);
               clearBatches(rawBatches);
               return IterOutcome.STOP;
             }
-          } catch (final IOException e) {
-            context.getExecutorState().fail(e);
-            clearBatches(rawBatches);
-            return IterOutcome.STOP;
-          }
-          if (rawBatch == null || 
rawBatch.getHeader().getDef().getFieldCount() == 0) {
-            createDummyBatch = true;
+            if (rawBatch == null || 
rawBatch.getHeader().getDef().getFieldCount() == 0) {
+              createDummyBatch = true;
+            }
+            // Even if rawBatch is null, go ahead and add it to the list.
+            // We will create dummy batches for all null batches later.
+            rawBatches.add(rawBatch);
           }
-          // Even if rawBatch is null, go ahead and add it to the list.
-          // We will create dummy batches for all null batches later.
-          rawBatches.add(rawBatch);
+          p++;
         }
-        p++;
-      }
 
-      // If no batch arrived with schema from any of the providers, just 
return NONE.
-      if (fieldList == null) {
-        return IterOutcome.NONE;
-      }
+        // If no batch arrived with schema from any of the providers, just 
return NONE.
+        if (fieldList == null) {
+          return IterOutcome.NONE;
+        }
 
-      // Go through and fix schema for empty batches.
-      if (createDummyBatch) {
-        // Create dummy record batch definition with 0 record count
-        UserBitShared.RecordBatchDef dummyDef = 
UserBitShared.RecordBatchDef.newBuilder()
-            // we cannot use/modify the original field list as that is used by
-            // valid record batch.
-            // create a copy of field list with valuecount = 0 for all fields.
-            // This is for dummy schema generation.
-            .addAllField(createDummyFieldList(fieldList))
-            .setRecordCount(0)
-            .build();
+        // Go through and fix schema for empty batches.
+        if (createDummyBatch) {
+          // Create dummy record batch definition with 0 record count
+          UserBitShared.RecordBatchDef dummyDef = 
UserBitShared.RecordBatchDef.newBuilder()
+              // we cannot use/modify the original field list as that is used 
by
+              // valid record batch.
+              // create a copy of field list with valuecount = 0 for all 
fields.
+              // This is for dummy schema generation.
+              .addAllField(createDummyFieldList(fieldList))
+              .setRecordCount(0)
+              .build();
 
-        // Create dummy header
-        BitData.FragmentRecordBatch dummyHeader = 
BitData.FragmentRecordBatch.newBuilder()
-            .setIsLastBatch(true)
-            .setDef(dummyDef)
-            .build();
+          // Create dummy header
+          BitData.FragmentRecordBatch dummyHeader = 
BitData.FragmentRecordBatch.newBuilder()
+              .setIsLastBatch(true)
+              .setDef(dummyDef)
+              .build();
 
-        for (int i = 0; i < p; i++) {
-          RawFragmentBatch rawBatch = rawBatches.get(i);
-          if (rawBatch == null || 
rawBatch.getHeader().getDef().getFieldCount() == 0) {
-            rawBatch = new RawFragmentBatch(dummyHeader, null, null);
-            rawBatches.set(i, rawBatch);
+          for (int i = 0; i < p; i++) {
+            RawFragmentBatch rawBatch = rawBatches.get(i);
+            if (rawBatch == null || 
rawBatch.getHeader().getDef().getFieldCount() == 0) {
+              rawBatch = new RawFragmentBatch(dummyHeader, null, null);
+              rawBatches.set(i, rawBatch);
+            }
           }
         }
+      } catch (Throwable t) {
+        clearBatches(rawBatches);
+        throw t;
       }
 
       // allocate the incoming record batch loaders
@@ -375,9 +379,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
             } else {
               batchLoaders[b].clear();
               batchLoaders[b] = null;
-              if (!context.getExecutorState().shouldContinue()) {
-                return IterOutcome.STOP;
-              }
+              checkContinue();
             }
           } catch (IOException | SchemaChangeException e) {
             context.getExecutorState().fail(e);
@@ -413,8 +415,8 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
 
           assert nextBatch != null || inputCounts[node.batchId] == 
outputCounts[node.batchId]
               : String.format("Stream %d input count: %d output count %d", 
node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
-          if (nextBatch == null && 
!context.getExecutorState().shouldContinue()) {
-            return IterOutcome.STOP;
+          if (nextBatch == null) {
+            checkContinue();
           }
         } catch (final IOException e) {
           context.getExecutorState().fail(e);
@@ -544,12 +546,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
         }
         final RawFragmentBatch batch = getNext(i);
         if (batch == null) {
-          if (!context.getExecutorState().shouldContinue()) {
-            state = BatchState.STOP;
-          } else {
-            state = BatchState.DONE;
-          }
-          break;
+          checkContinue();
         }
         if (batch.getHeader().getDef().getFieldCount() == 0) {
           i++;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index c309e8c..d67ae42 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.QueryCancelledException;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.impl.sort.SortBatch;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
@@ -259,15 +260,12 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
    * @return True if the given timeout is expired. False when interrupted and
    *         the fragment status is not runnable.
    */
-  private boolean waitUntilTimeOut(final long timeout) {
+  private void waitUntilTimeOut(final long timeout) {
     while(true) {
       try {
         Thread.sleep(timeout);
-        return true;
       } catch (final InterruptedException e) {
-        if (!context.getExecutorState().shouldContinue()) {
-          return false;
-        }
+        throw new QueryCancelledException();
       }
     }
   }
@@ -307,18 +305,14 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
         // TODO: this should be polling.
 
         if (val < fragmentsBeforeProceed) {
-          if (!waitUntilTimeOut(10)) {
-            return false;
-          }
+          waitUntilTimeOut(10);
         }
         for (int i = 0; i < 100 && finalTable == null; i++) {
           finalTable = tableMap.get(finalTableKey);
           if (finalTable != null) {
             break;
           }
-          if (!waitUntilTimeOut(10)) {
-            return false;
-          }
+          waitUntilTimeOut(10);
         }
         if (finalTable == null) {
           buildTable();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index 04371d2..8646192 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.LockSupport;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.QueryCancelledException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
@@ -51,7 +52,7 @@ public final class PartitionerDecorator {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
   private static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
 
-  private List<Partitioner> partitioners;
+  private final List<Partitioner> partitioners;
   private final OperatorStats stats;
   private final ExecutorService executor;
   private final FragmentContext context;
@@ -155,6 +156,7 @@ public final class PartitionerDecorator {
         testCountDownLatch.countDown();
       } catch (InterruptedException e) {
         logger.warn("fragment thread interrupted", e);
+        throw new QueryCancelledException();
       } catch (RejectedExecutionException e) {
         logger.warn("Failed to execute partitioner tasks. Execution service 
down?", e);
         executionException = new ExecutionException(e);
@@ -309,7 +311,7 @@ public final class PartitionerDecorator {
 
     private final GeneralExecuteIface iface;
     private final Partitioner partitioner;
-    private CountDownLatchInjection testCountDownLatch;
+    private final CountDownLatchInjection testCountDownLatch;
 
     private volatile ExecutionException exception;
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 633920e..9d67b5d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.QueryCancelledException;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -68,11 +69,7 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch<ProducerConsumer>
       wrapper = queue.take();
       logger.debug("Got batch from queue");
     } catch (final InterruptedException e) {
-      if (context.getExecutorState().shouldContinue()) {
-        context.getExecutorState().fail(e);
-      }
-      return IterOutcome.STOP;
-      // TODO InterruptedException
+      throw new QueryCancelledException();
     } finally {
       stats.stopWait();
     }
@@ -155,7 +152,7 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch<ProducerConsumer>
         }
       } catch (final InterruptedException e) {
         logger.warn("Producer thread is interrupted.", e);
-        // TODO InterruptedException
+        throw new QueryCancelledException();
       } finally {
         if (stop) {
           try {
@@ -203,6 +200,7 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch<ProducerConsumer>
     } catch (final InterruptedException e) {
       logger.warn("Interrupted while waiting for producer to clean up first. I 
will try to clean up now...", e);
       // TODO we should retry to wait for the latch
+      throw new QueryCancelledException();
     } finally {
       super.close();
       clearQueue();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 8b3c2e9..f6304aa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -188,9 +188,7 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
       if (batch == null) {
         lastOutcome = IterOutcome.NONE;
         batchLoader.zero();
-        if (!context.getExecutorState().shouldContinue()) {
-          lastOutcome = IterOutcome.STOP;
-        }
+        context.getExecutorState().checkContinue();
         return lastOutcome;
       }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index b4ebc1d..17a634d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -423,11 +423,8 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
       // No records to sort
       sortState = SortState.DONE;
       return NONE;
-    } else if (!context.getExecutorState().shouldContinue()) {
-      // Interrupted
-      sortState = SortState.DONE;
-      return STOP;
     } else {
+      checkContinue();
 
       // There is some data to be returned downstream.
       // We have to prepare output container
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 8a31e3e..e503c5d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -150,9 +150,7 @@ public abstract class MSortTemplate implements MSorter, 
IndexedSortable {
       int totalCount = vector4.getTotalCount();
 
       // check if we're cancelled/failed recently
-      if (!context.getExecutorState().shouldContinue()) {
-        return;
-      }
+      context.getExecutorState().checkContinue();
 
       int outIndex = 0;
       Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 1548ec5..94ee022 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -109,19 +109,15 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
   }
 
   public final IterOutcome next(RecordBatch b) {
-    if(!context.getExecutorState().shouldContinue()) {
-      return IterOutcome.STOP;
-    }
+    checkContinue();
     return next(0, b);
   }
 
   public final IterOutcome next(int inputIndex, RecordBatch b) {
     IterOutcome next;
-    stats.stopProcessing();
     try {
-      if (!context.getExecutorState().shouldContinue()) {
-        return IterOutcome.STOP;
-      }
+      stats.stopProcessing();
+      checkContinue();
       next = b.next();
     } finally {
       stats.startProcessing();
@@ -267,4 +263,14 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
   public boolean isRecordBatchStatsLoggingEnabled() {
     return batchStatsContext.isEnableBatchSzLogging();
   }
+
+  /**
+   * Checks if the query should continue. Throws a UserException if not.
+   * Operators should call this periodically to detect cancellation
+   * requests. The operator need not catch the exception: it will bubble
+   * up the operator tree and be handled like any other fatal error.
+   */
+  public void checkContinue() {
+    context.getExecutorState().checkContinue();
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 79884e8..161fa83 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.QueryCancelledException;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
@@ -332,6 +333,8 @@ public class FragmentExecutor implements Runnable {
         }
       });
 
+    } catch (QueryCancelledException e) {
+      // Ignore: indicates query cancelled by this executor
     } catch (OutOfMemoryError | OutOfMemoryException e) {
       if (FailureUtils.isDirectMemoryOOM(e)) {
         fail(UserException.memoryError(e).build(logger));
@@ -528,6 +531,13 @@ public class FragmentExecutor implements Runnable {
     public Throwable getFailureCause(){
       return deferredException.getException();
     }
+
+    @Override
+    public void checkContinue() {
+      if (!shouldContinue()) {
+        throw new QueryCancelledException();
+      }
+    }
   }
 
   private class FragmentDrillbitStatusListener implements 
DrillbitStatusListener {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index a176646..68cf18f 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.drill.common.DeferredException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.QueryCancelledException;
 import org.apache.drill.exec.ops.RootFragmentContext;
 import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -74,6 +75,12 @@ public class SimpleRootExec implements RootExec, 
Iterable<ValueVector> {
       return ex.getException();
     }
 
+    @Override
+    public void checkContinue() {
+      if (!shouldContinue()) {
+        throw new QueryCancelledException();
+      }
+    }
   }
 
   public RootFragmentContext getContext() {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 8969265..b3e1d8e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -57,7 +57,7 @@ import org.junit.experimental.categories.Category;
 import java.util.ArrayList;
 import java.util.List;
 
-import static junit.framework.TestCase.fail;
+import static org.junit.Assert.fail;
 
 @Category(OperatorTest.class)
 public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 1e2df25..b9f4b43 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -464,27 +464,22 @@ public class OperatorFixture extends BaseFixture 
implements AutoCloseable {
     }
   }
 
-  public static class MockExecutorState implements 
FragmentContext.ExecutorState
-  {
+  public static class MockExecutorState
+      implements FragmentContext.ExecutorState {
     @Override
-    public boolean shouldContinue() {
-      return true;
-    }
+    public boolean shouldContinue() { return true; }
 
     @Override
-    public void fail(Throwable t) {
+    public void fail(Throwable t) { }
 
-    }
+    @Override
+    public boolean isFailed() { return false; }
 
     @Override
-    public boolean isFailed() {
-      return false;
-    }
+    public Throwable getFailureCause() { return null; }
 
     @Override
-    public Throwable getFailureCause() {
-      return null;
-    }
+    public void checkContinue() { }
   }
 
   public OperatorContext newOperatorContext(PhysicalOperator popConfig) {

Reply via email to