Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181915654
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
 ---
    @@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean 
isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, 
InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for 
completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
    -    private volatile IOException exp;
    +    private volatile ExecutionException exception;
     
    -    public CustomRunnable(final String parentThreadName, final 
CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection 
testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, 
GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, 
CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test 
countdown latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      if (runner.compareAndSet(null, thread)) {
    +        final String name = thread.getName();
    +        thread.setName(String.format("Partitioner-%s-%d", 
partitionerDecorator.thread.getName(), thread.getId()));
    +        final OperatorStats localStats = partitioner.getStats();
             localStats.clear();
             localStats.startProcessing();
    -        iface.execute(part);
    -      } catch (IOException e) {
    -        exp = e;
    -      } finally {
    -        localStats.stopProcessing();
    -        currThread.setName(currThreadName);
    -        latch.countDown();
    +        ExecutionException executionException = null;
    +        try {
    +          // Test only - Pause until interrupted by fragment thread
    +          testCountDownLatch.await();
    +          if (state.get() == STATE.NEW) {
    +            iface.execute(partitioner);
    +          }
    +        } catch (InterruptedException e) {
    +          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
    +            logger.warn("Partitioner Task interrupted during the run", e);
    +          }
    +        } catch (Throwable t) {
    +          executionException = new ExecutionException(t);
    +        } finally {
    +          if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
    +            if (executionException == null) {
    +              localStats.stopProcessing();
    +              state.lazySet(STATE.NORMAL);
    +            } else {
    +              exception = executionException;
    +              state.lazySet(STATE.EXCEPTIONAL);
    +            }
    +          }
    +          if (count.decrementAndGet() == 0) {
    +            LockSupport.unpark(partitionerDecorator.thread);
    +          }
    +          thread.setName(name);
    +          while (state.get() == STATE.INTERRUPTING) {
    +            Thread.yield();
    +          }
    +          // Clear interrupt flag
    +          Thread.interrupted();
    +        }
    +      }
    +    }
    +
    +    void cancel(boolean mayInterruptIfRunning) {
    +      Preconditions.checkState(Thread.currentThread() == 
partitionerDecorator.thread,
    +          String.format("PartitionerTask can be cancelled only from the 
main %s thread", partitionerDecorator.thread.getName()));
    +      if (runner.compareAndSet(null, partitionerDecorator.thread)) {
    +        if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
    +          ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
    +        }
    +        count.decrementAndGet();
    --- End diff --
    
    The main PartitionDecorator thread may not get unparked.
    
    1. Two PartitionerTasks are created.
    2. Before any of the PartitionerTasks set their runner, the main 
Partitioner thread enters await() and cancels each PartitionerTask.
    3. The cancel method executes ```
            if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
              ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
            }
            count.decrementAndGet();```
        For each PartitionerTask.
     4. The count goes to zero, but the main PartitionerDecorator thread still 
calls Locksupport.park() in await().
     5. Unpark wasn't called when the count reached zero so the main 
PartitionDecorator thread can remain parked.


---

Reply via email to