Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181265596
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
 ---
    @@ -262,68 +279,120 @@ public FlushBatchesHandlingClass(boolean 
isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, 
InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for 
completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
         private volatile IOException exp;
     
    -    public CustomRunnable(final String parentThreadName, final 
CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection 
testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, 
GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, 
CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test 
countdown latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      if (state.get() == STATE.NEW && runner.compareAndSet(null, thread)) {
    --- End diff --
    
    I think there is a race condition here. Consider the following case:
    
    1. A PartitionTask starts executing, let's call it **Task A**
    2. The PartitionTask executes the state check `state.get() == STATE.NEW` 
and then execution stops temporarily.
    3. The main PartitionDecorator thread executes await(count, 
partitionerTasks)
    4. `context.getExecutorState().shouldContinue()` is false so the 
PartitionTasks are cancelled.
    5. The cancel method is called for **Task A**
    6. In the cancel method ` (state.compareAndSet(STATE.NEW, 
mayInterruptIfRunning ? STATE.INTERRUPTING : STATE.CANCELLED)` will return true
    7. `Thread thread = runner.get();` is executed but it is null since **Task 
A** has not set the runner yet.
    8. The else statement in the cancel method is executed and 
`((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);` is called.
    9. The remove method does not cancel **Task A** since it has already 
started executing, and the interrupt is not set so it continue running.
    10. `count.decrementAndGet();` is executed so the count will be zero but 
**Task A** is still running.


---

Reply via email to