Github user ilooner commented on a diff in the pull request:
https://github.com/apache/drill/pull/1208#discussion_r181265596
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
---
@@ -262,68 +279,120 @@ public FlushBatchesHandlingClass(boolean
isLastBatch, boolean schemaChanged) {
}
@Override
- public void execute(Partitioner part) throws IOException {
+ public void execute(Partitioner part) throws IOException,
InterruptedException {
part.flushOutgoingBatches(isLastBatch, schemaChanged);
}
}
/**
- * Helper class to wrap Runnable with customized naming
- * Exception handling
+ * Helper class to wrap Runnable with cancellation and waiting for
completion support
*
*/
- private static class CustomRunnable implements Runnable {
+ private static class PartitionerTask implements Runnable {
+
+ private enum STATE {
+ NEW,
+ COMPLETING,
+ NORMAL,
+ EXCEPTIONAL,
+ CANCELLED,
+ INTERRUPTING,
+ INTERRUPTED
+ }
+
+ private final AtomicReference<STATE> state;
+ private final AtomicReference<Thread> runner;
+ private final PartitionerDecorator partitionerDecorator;
+ private final AtomicInteger count;
- private final String parentThreadName;
- private final CountDownLatch latch;
private final GeneralExecuteIface iface;
- private final Partitioner part;
+ private final Partitioner partitioner;
private CountDownLatchInjection testCountDownLatch;
private volatile IOException exp;
- public CustomRunnable(final String parentThreadName, final
CountDownLatch latch, final GeneralExecuteIface iface,
- final Partitioner part, CountDownLatchInjection
testCountDownLatch) {
- this.parentThreadName = parentThreadName;
- this.latch = latch;
+ public PartitionerTask(PartitionerDecorator partitionerDecorator,
GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count,
CountDownLatchInjection testCountDownLatch) {
+ state = new AtomicReference<>(STATE.NEW);
+ runner = new AtomicReference<>();
+ this.partitionerDecorator = partitionerDecorator;
this.iface = iface;
- this.part = part;
+ this.partitioner = partitioner;
+ this.count = count;
this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
- // Test only - Pause until interrupted by fragment thread
- try {
- testCountDownLatch.await();
- } catch (final InterruptedException e) {
- logger.debug("Test only: partitioner thread is interrupted in test
countdown latch await()", e);
- }
-
- final Thread currThread = Thread.currentThread();
- final String currThreadName = currThread.getName();
- final OperatorStats localStats = part.getStats();
- try {
- final String newThreadName = parentThreadName + currThread.getId();
- currThread.setName(newThreadName);
+ final Thread thread = Thread.currentThread();
+ if (state.get() == STATE.NEW && runner.compareAndSet(null, thread)) {
--- End diff --
I think there is a race condition here. Consider the following case:
1. A PartitionTask starts executing, let's call it **Task A**
2. The PartitionTask executes the state check `state.get() == STATE.NEW`
and then execution stops temporarily.
3. The main PartitionDecorator thread executes await(count,
partitionerTasks)
4. `context.getExecutorState().shouldContinue()` is false so the
PartitionTasks are cancelled.
5. The cancel method is called for **Task A**
6. In the cancel method ` (state.compareAndSet(STATE.NEW,
mayInterruptIfRunning ? STATE.INTERRUPTING : STATE.CANCELLED)` will return true
7. `Thread thread = runner.get();` is executed but it is null since **Task
A** has not set the runner yet.
8. The else statement in the cancel method is executed and
`((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);` is called.
9. The remove method does not cancel **Task A** since it has already
started executing, and the interrupt is not set so it continue running.
10. `count.decrementAndGet();` is executed so the count will be zero but
**Task A** is still running.
---