DRILL-6295: PartitionerDecorator may close partitioners while CustomRunnable 
are active during query cancellation

This closes #1208


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/931b43ec
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/931b43ec
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/931b43ec

Branch: refs/heads/master
Commit: 931b43ec54bf47dcbb4aa9ae4499f37a8f21b408
Parents: fdc6978
Author: Vlad Rozov <vro...@apache.org>
Authored: Wed Apr 11 10:12:07 2018 -0700
Committer: Parth Chandra <par...@apache.org>
Committed: Tue Apr 17 18:18:57 2018 -0700

----------------------------------------------------------------------
 .../PartitionSenderRootExec.java                |  13 +-
 .../partitionsender/PartitionerDecorator.java   | 333 +++++++++++--------
 .../exec/testing/CountDownLatchInjection.java   |   4 +-
 .../partitionsender/TestPartitionSender.java    |  11 +-
 4 files changed, 221 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/931b43ec/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 7e76238..034d6c2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.partitionsender;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
@@ -167,10 +168,10 @@ public class PartitionSenderRootExec extends BaseRootExec 
{
           } else {
             sendEmptyBatch(true);
           }
-        } catch (IOException e) {
+        } catch (ExecutionException e) {
           incoming.kill(false);
           logger.error("Error while creating partitioning sender or flushing 
outgoing batches", e);
-          context.getExecutorState().fail(e);
+          context.getExecutorState().fail(e.getCause());
         }
         return false;
 
@@ -197,10 +198,10 @@ public class PartitionSenderRootExec extends BaseRootExec 
{
             first = false;
             sendEmptyBatch(false);
           }
-        } catch (IOException e) {
+        } catch (ExecutionException e) {
           incoming.kill(false);
           logger.error("Error while flushing outgoing batches", e);
-          context.getExecutorState().fail(e);
+          context.getExecutorState().fail(e.getCause());
           return false;
         } catch (SchemaChangeException e) {
           incoming.kill(false);
@@ -211,8 +212,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
       case OK:
         try {
           partitioner.partitionBatch(incoming);
-        } catch (IOException e) {
-          context.getExecutorState().fail(e);
+        } catch (ExecutionException e) {
+          context.getExecutorState().fail(e.getCause());
           incoming.kill(false);
           return false;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/931b43ec/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index bb69f39..e9838f7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -18,10 +18,15 @@
 package org.apache.drill.exec.physical.impl.partitionsender;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorStats;
@@ -31,7 +36,8 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.testing.CountDownLatchInjection;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Decorator class to hide multiple Partitioner existence from the caller
@@ -41,34 +47,37 @@ import com.google.common.collect.Lists;
  * The algorithm to figure out processing versus wait time is based on 
following formula:
  * totalWaitTime = totalAllPartitionersProcessingTime - 
max(sum(processingTime) by partitioner)
  */
-public class PartitionerDecorator {
+public final class PartitionerDecorator {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
   private static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
 
   private List<Partitioner> partitioners;
   private final OperatorStats stats;
-  private final String tName;
-  private final String childThreadPrefix;
   private final ExecutorService executor;
   private final FragmentContext context;
+  private final Thread thread;
+  private final boolean enableParallelTaskExecution;
 
+  PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, 
FragmentContext context) {
+    this(partitioners, stats, context, partitioners.size() > 1);
+  }
 
-  public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats 
stats, FragmentContext context) {
+  PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, 
FragmentContext context, boolean enableParallelTaskExecution) {
     this.partitioners = partitioners;
     this.stats = stats;
     this.context = context;
-    this.executor = context.getExecutor();
-    this.tName = Thread.currentThread().getName();
-    this.childThreadPrefix = "Partitioner-" + tName + "-";
+    this.enableParallelTaskExecution = enableParallelTaskExecution;
+    executor =  enableParallelTaskExecution ?  context.getExecutor() : 
MoreExecutors.newDirectExecutorService();
+    thread = Thread.currentThread();
   }
 
   /**
    * partitionBatch - decorator method to call real Partitioner(s) to process 
incoming batch
    * uses either threading or not threading approach based on number 
Partitioners
    * @param incoming
-   * @throws IOException
+   * @throws ExecutionException
    */
-  public void partitionBatch(final RecordBatch incoming) throws IOException {
+  public void partitionBatch(final RecordBatch incoming) throws 
ExecutionException {
     executeMethodLogic(new PartitionBatchHandlingClass(incoming));
   }
 
@@ -76,9 +85,9 @@ public class PartitionerDecorator {
    * flushOutgoingBatches - decorator to call real Partitioner(s) 
flushOutgoingBatches
    * @param isLastBatch
    * @param schemaChanged
-   * @throws IOException
+   * @throws ExecutionException
    */
-  public void flushOutgoingBatches(final boolean isLastBatch, final boolean 
schemaChanged) throws IOException {
+  public void flushOutgoingBatches(final boolean isLastBatch, final boolean 
schemaChanged) throws ExecutionException {
     executeMethodLogic(new FlushBatchesHandlingClass(isLastBatch, 
schemaChanged));
   }
 
@@ -118,105 +127,115 @@ public class PartitionerDecorator {
     return null;
   }
 
-  @VisibleForTesting
-  protected List<Partitioner> getPartitioners() {
+  List<Partitioner> getPartitioners() {
     return partitioners;
   }
 
   /**
    * Helper to execute the different methods wrapped into same logic
    * @param iface
-   * @throws IOException
+   * @throws ExecutionException
    */
-  protected void executeMethodLogic(final GeneralExecuteIface iface) throws 
IOException {
-    if (partitioners.size() == 1 ) {
-      // no need for threads
-      final OperatorStats localStatsSingle = partitioners.get(0).getStats();
-      localStatsSingle.clear();
-      localStatsSingle.startProcessing();
+  @VisibleForTesting
+  void executeMethodLogic(final GeneralExecuteIface iface) throws 
ExecutionException {
+    // To simulate interruption of main fragment thread and interrupting the 
partition threads, create a
+    // CountDownInject latch. Partitioner threads await on the latch and main 
fragment thread counts down or
+    // interrupts waiting threads. This makes sure that we are actually 
interrupting the blocked partitioner threads.
+    try (CountDownLatchInjection testCountDownLatch = 
injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch")) {
+      testCountDownLatch.initialize(1);
+      final AtomicInteger count = new AtomicInteger();
+      List<PartitionerTask> partitionerTasks = new 
ArrayList<>(partitioners.size());
+      ExecutionException executionException = null;
+      // start waiting on main stats to adjust by sum(max(processing)) at the 
end
+      startWait();
       try {
-        iface.execute(partitioners.get(0));
+        partitioners.forEach(partitioner -> createAndExecute(iface, 
testCountDownLatch, count, partitionerTasks, partitioner));
+        // Wait for main fragment interruption.
+        injector.injectInterruptiblePause(context.getExecutionControls(), 
"wait-for-fragment-interrupt", logger);
+        testCountDownLatch.countDown();
+      } catch (InterruptedException e) {
+        logger.warn("fragment thread interrupted", e);
+      } catch (RejectedExecutionException e) {
+        logger.warn("Failed to execute partitioner tasks. Execution service 
down?", e);
+        executionException = new ExecutionException(e);
       } finally {
-        localStatsSingle.stopProcessing();
-        stats.mergeMetrics(localStatsSingle);
-        // since main stats did not have any wait time - adjust based of 
partitioner stats wait time
-        // main stats processing time started recording in BaseRootExec
-        stats.adjustWaitNanos(localStatsSingle.getWaitNanos());
+        await(count, partitionerTasks);
+        stopWait();
+        processPartitionerTasks(partitionerTasks, executionException);
       }
-      return;
     }
+  }
 
-    long maxProcessTime = 0l;
-    // start waiting on main stats to adjust by sum(max(processing)) at the end
-    stats.startWait();
-    final CountDownLatch latch = new CountDownLatch(partitioners.size());
-    final List<CustomRunnable> runnables = Lists.newArrayList();
-    final List<Future<?>> taskFutures = Lists.newArrayList();
-    CountDownLatchInjection testCountDownLatch = null;
-    try {
-      // To simulate interruption of main fragment thread and interrupting the 
partition threads, create a
-      // CountDownInject patch. Partitioner threads await on the latch and 
main fragment thread counts down or
-      // interrupts waiting threads. This makes sures that we are actually 
interrupting the blocked partitioner threads.
-      testCountDownLatch = injector.getLatch(context.getExecutionControls(), 
"partitioner-sender-latch");
-      testCountDownLatch.initialize(1);
-      for (final Partitioner part : partitioners) {
-        final CustomRunnable runnable = new CustomRunnable(childThreadPrefix, 
latch, iface, part, testCountDownLatch);
-        runnables.add(runnable);
-        taskFutures.add(executor.submit(runnable));
+  private void createAndExecute(GeneralExecuteIface iface, 
CountDownLatchInjection testCountDownLatch, AtomicInteger count,
+      List<PartitionerTask> partitionerTasks, Partitioner partitioner) {
+    PartitionerTask partitionerTask = new PartitionerTask(this, iface, 
partitioner, count, testCountDownLatch);
+    executor.execute(partitionerTask);
+    partitionerTasks.add(partitionerTask);
+    count.incrementAndGet();
+  }
+
+  /**
+   * Wait for completion of all partitioner tasks.
+   * @param count current number of task not yet completed
+   * @param partitionerTasks list of partitioner tasks submitted for execution
+   */
+  private void await(AtomicInteger count, List<PartitionerTask> 
partitionerTasks) {
+    boolean cancelled = false;
+    while (count.get() > 0) {
+      if (context.getExecutorState().shouldContinue() || cancelled) {
+        LockSupport.park();
+      } else {
+        logger.warn("Cancelling fragment {} partitioner tasks...", 
context.getFragIdString());
+        partitionerTasks.forEach(partitionerTask -> 
partitionerTask.cancel(true));
+        cancelled = true;
       }
+    }
+  }
 
-      while (true) {
-        try {
-          // Wait for main fragment interruption.
-          injector.injectInterruptiblePause(context.getExecutionControls(), 
"wait-for-fragment-interrupt", logger);
-
-          // If there is no pause inserted at site 
"wait-for-fragment-interrupt", release the latch.
-          injector.getLatch(context.getExecutionControls(), 
"partitioner-sender-latch").countDown();
-
-          latch.await();
-          break;
-        } catch (final InterruptedException e) {
-          // If the fragment state says we shouldn't continue, cancel or 
interrupt partitioner threads
-          if (!context.getExecutorState().shouldContinue()) {
-            logger.debug("Interrupting partioner threads. Fragment thread {}", 
tName);
-            for(Future<?> f : taskFutures) {
-              f.cancel(true);
-            }
+  private void startWait() {
+    if (enableParallelTaskExecution) {
+      stats.startWait();
+    }
+  }
 
-            break;
-          }
-        }
-      }
+  private void stopWait() {
+    if (enableParallelTaskExecution) {
+      stats.stopWait();
+    }
+  }
 
-      IOException excep = null;
-      for (final CustomRunnable runnable : runnables ) {
-        IOException myException = runnable.getException();
-        if ( myException != null ) {
-          if ( excep == null ) {
-            excep = myException;
-          } else {
-            excep.addSuppressed(myException);
-          }
+  private void processPartitionerTasks(List<PartitionerTask> partitionerTasks, 
ExecutionException executionException) throws ExecutionException {
+    long maxProcessTime = 0l;
+    for (PartitionerTask partitionerTask : partitionerTasks) {
+      ExecutionException e = partitionerTask.getException();
+      if (e != null) {
+        if (executionException == null) {
+          executionException = e;
+        } else {
+          executionException.getCause().addSuppressed(e.getCause());
         }
-        final OperatorStats localStats = runnable.getPart().getStats();
-        long currentProcessingNanos = localStats.getProcessingNanos();
+      }
+      if (executionException == null) {
+        final OperatorStats localStats = partitionerTask.getStats();
         // find out max Partitioner processing time
-        maxProcessTime = (currentProcessingNanos > maxProcessTime) ? 
currentProcessingNanos : maxProcessTime;
+        if (enableParallelTaskExecution) {
+          long currentProcessingNanos = localStats.getProcessingNanos();
+          maxProcessTime = (currentProcessingNanos > maxProcessTime) ? 
currentProcessingNanos : maxProcessTime;
+        } else {
+          maxProcessTime += localStats.getWaitNanos();
+        }
         stats.mergeMetrics(localStats);
       }
-      if ( excep != null ) {
-        throw excep;
-      }
-    } finally {
-      stats.stopWait();
-      // scale down main stats wait time based on calculated processing time
-      // since we did not wait for whole duration of above execution
+    }
+    if (executionException != null) {
+      throw executionException;
+    }
+    // scale down main stats wait time based on calculated processing time
+    // since we did not wait for whole duration of above execution
+    if (enableParallelTaskExecution) {
       stats.adjustWaitNanos(-maxProcessTime);
-
-      // Done with the latch, close it.
-      if (testCountDownLatch != null) {
-        testCountDownLatch.close();
-      }
+    } else {
+      stats.adjustWaitNanos(maxProcessTime);
     }
   }
 
@@ -237,7 +256,7 @@ public class PartitionerDecorator {
 
     private final RecordBatch incoming;
 
-    public PartitionBatchHandlingClass(RecordBatch incoming) {
+    PartitionBatchHandlingClass(RecordBatch incoming) {
       this.incoming = incoming;
     }
 
@@ -268,62 +287,116 @@ public class PartitionerDecorator {
   }
 
   /**
-   * Helper class to wrap Runnable with customized naming
-   * Exception handling
+   * Helper class to wrap Runnable with cancellation and waiting for 
completion support
    *
    */
-  private static class CustomRunnable implements Runnable {
+  private static class PartitionerTask implements Runnable {
+
+    private enum STATE {
+      NEW,
+      COMPLETING,
+      NORMAL,
+      EXCEPTIONAL,
+      CANCELLED,
+      INTERRUPTING,
+      INTERRUPTED
+    }
+
+    private final AtomicReference<STATE> state;
+    private final AtomicReference<Thread> runner;
+    private final PartitionerDecorator partitionerDecorator;
+    private final AtomicInteger count;
 
-    private final String parentThreadName;
-    private final CountDownLatch latch;
     private final GeneralExecuteIface iface;
-    private final Partitioner part;
+    private final Partitioner partitioner;
     private CountDownLatchInjection testCountDownLatch;
 
-    private volatile IOException exp;
+    private volatile ExecutionException exception;
 
-    public CustomRunnable(final String parentThreadName, final CountDownLatch 
latch, final GeneralExecuteIface iface,
-        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
-      this.parentThreadName = parentThreadName;
-      this.latch = latch;
+    public PartitionerTask(PartitionerDecorator partitionerDecorator, 
GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, 
CountDownLatchInjection testCountDownLatch) {
+      state = new AtomicReference<>(STATE.NEW);
+      runner = new AtomicReference<>();
+      this.partitionerDecorator = partitionerDecorator;
       this.iface = iface;
-      this.part = part;
+      this.partitioner = partitioner;
+      this.count = count;
       this.testCountDownLatch = testCountDownLatch;
     }
 
     @Override
     public void run() {
-      // Test only - Pause until interrupted by fragment thread
-      try {
-        testCountDownLatch.await();
-      } catch (final InterruptedException e) {
-        logger.debug("Test only: partitioner thread is interrupted in test 
countdown latch await()", e);
-      }
-
-      final Thread currThread = Thread.currentThread();
-      final String currThreadName = currThread.getName();
-      final OperatorStats localStats = part.getStats();
-      try {
-        final String newThreadName = parentThreadName + currThread.getId();
-        currThread.setName(newThreadName);
+      final Thread thread = Thread.currentThread();
+      if (runner.compareAndSet(null, thread)) {
+        final String name = thread.getName();
+        thread.setName(String.format("Partitioner-%s-%d", 
partitionerDecorator.thread.getName(), thread.getId()));
+        final OperatorStats localStats = partitioner.getStats();
         localStats.clear();
         localStats.startProcessing();
-        iface.execute(part);
-      } catch (IOException e) {
-        exp = e;
-      } finally {
-        localStats.stopProcessing();
-        currThread.setName(currThreadName);
-        latch.countDown();
+        ExecutionException executionException = null;
+        try {
+          // Test only - Pause until interrupted by fragment thread
+          testCountDownLatch.await();
+          if (state.get() == STATE.NEW) {
+            iface.execute(partitioner);
+          }
+        } catch (InterruptedException e) {
+          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
+            logger.warn("Partitioner Task interrupted during the run", e);
+          }
+        } catch (Throwable t) {
+          executionException = new ExecutionException(t);
+        } finally {
+          if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
+            if (executionException == null) {
+              localStats.stopProcessing();
+              state.lazySet(STATE.NORMAL);
+            } else {
+              exception = executionException;
+              state.lazySet(STATE.EXCEPTIONAL);
+            }
+          }
+          if (count.decrementAndGet() == 0) {
+            LockSupport.unpark(partitionerDecorator.thread);
+          }
+          thread.setName(name);
+          while (state.get() == STATE.INTERRUPTING) {
+            Thread.yield();
+          }
+          // Clear interrupt flag
+          Thread.interrupted();
+        }
+      }
+    }
+
+    void cancel(boolean mayInterruptIfRunning) {
+      Preconditions.checkState(Thread.currentThread() == 
partitionerDecorator.thread,
+          String.format("PartitionerTask can be cancelled only from the main 
%s thread", partitionerDecorator.thread.getName()));
+      if (runner.compareAndSet(null, partitionerDecorator.thread)) {
+        if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
+          ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
+        }
+        count.decrementAndGet();
+      } else {
+        if (mayInterruptIfRunning) {
+          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTING)) {
+            try {
+              runner.get().interrupt();
+            } finally {
+              state.lazySet(STATE.INTERRUPTED);
+            }
+          }
+        } else {
+          state.compareAndSet(STATE.NEW, STATE.CANCELLED);
+        }
       }
     }
 
-    public IOException getException() {
-      return this.exp;
+    public ExecutionException getException() {
+      return this.exception;
     }
 
-    public Partitioner getPart() {
-      return part;
+    public OperatorStats getStats() {
+      return partitioner.getStats();
     }
   }
- }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/931b43ec/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
index 957d79b..4103ee6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.testing;
 
+import org.apache.drill.common.AutoCloseables.Closeable;
+
 /**
  * This class is used internally for tracking injected countdown latches. 
These latches are specified via
  * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} 
session option.
@@ -25,7 +27,7 @@ package org.apache.drill.exec.testing;
  * with the expected number of countdown and awaits. The child threads count 
down on the same latch (same site class
  * and same descriptor), and once there are enough, the parent thread 
continues.
  */
-public interface CountDownLatchInjection {
+public interface CountDownLatchInjection extends Closeable {
 
   /**
    * Initializes the underlying latch

http://git-wip-us.apache.org/repos/asf/drill/blob/931b43ec/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index c431fea..eaaa87d 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -28,6 +28,7 @@ import java.io.PrintWriter;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.PlanTestBase;
@@ -68,6 +69,8 @@ import 
org.apache.drill.exec.server.options.OptionValue.AccessibleScopes;
 import org.apache.drill.exec.server.options.OptionValue.OptionScope;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.test.OperatorFixture.MockExecutorState;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -221,6 +224,7 @@ public class TestPartitionSender extends PlanTestBase {
       FragmentContextImpl context = null;
       try {
         context = new FragmentContextImpl(drillbitContext, planFragment, null, 
registry);
+        context.setExecutorState(new MockExecutorState());
         final int majorFragmentId = 
planFragment.getHandle().getMajorFragmentId();
         final HashPartitionSender partSender = new 
HashPartitionSender(majorFragmentId, hashToRandomExchange, 
hashToRandomExchange.getExpression(), mfEndPoints);
         partionSenderRootExec = new MockPartitionSenderRootExec(context, 
incoming, partSender);
@@ -285,8 +289,8 @@ public class TestPartitionSender extends PlanTestBase {
         partionSenderRootExec.getStats().startProcessing();
         try {
           partDecor.executeMethodLogic(new InjectExceptionTest());
-          fail("Should throw IOException here");
-        } catch (IOException ioe) {
+          fail("executeMethodLogic should throw an exception.");
+        } catch (ExecutionException e) {
           final OperatorProfile.Builder oPBuilder = 
OperatorProfile.newBuilder();
           partionSenderRootExec.getStats().addAllMetrics(oPBuilder);
           final List<MetricValue> metrics = oPBuilder.getMetricList();
@@ -298,7 +302,8 @@ public class TestPartitionSender extends PlanTestBase {
               assertEquals(actualThreads, metric.getLongValue());
             }
           }
-          assertEquals(actualThreads-1, ioe.getSuppressed().length);
+          assertTrue(e.getCause() instanceof IOException);
+          assertEquals(actualThreads-1, e.getCause().getSuppressed().length);
         } finally {
           partionSenderRootExec.getStats().stopProcessing();
         }

Reply via email to