Github user ilooner commented on a diff in the pull request:
https://github.com/apache/drill/pull/1208#discussion_r181915654
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
---
@@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean
isLastBatch, boolean schemaChanged) {
}
@Override
- public void execute(Partitioner part) throws IOException {
+ public void execute(Partitioner part) throws IOException,
InterruptedException {
part.flushOutgoingBatches(isLastBatch, schemaChanged);
}
}
/**
- * Helper class to wrap Runnable with customized naming
- * Exception handling
+ * Helper class to wrap Runnable with cancellation and waiting for
completion support
*
*/
- private static class CustomRunnable implements Runnable {
+ private static class PartitionerTask implements Runnable {
+
+ private enum STATE {
+ NEW,
+ COMPLETING,
+ NORMAL,
+ EXCEPTIONAL,
+ CANCELLED,
+ INTERRUPTING,
+ INTERRUPTED
+ }
+
+ private final AtomicReference<STATE> state;
+ private final AtomicReference<Thread> runner;
+ private final PartitionerDecorator partitionerDecorator;
+ private final AtomicInteger count;
- private final String parentThreadName;
- private final CountDownLatch latch;
private final GeneralExecuteIface iface;
- private final Partitioner part;
+ private final Partitioner partitioner;
private CountDownLatchInjection testCountDownLatch;
- private volatile IOException exp;
+ private volatile ExecutionException exception;
- public CustomRunnable(final String parentThreadName, final
CountDownLatch latch, final GeneralExecuteIface iface,
- final Partitioner part, CountDownLatchInjection
testCountDownLatch) {
- this.parentThreadName = parentThreadName;
- this.latch = latch;
+ public PartitionerTask(PartitionerDecorator partitionerDecorator,
GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count,
CountDownLatchInjection testCountDownLatch) {
+ state = new AtomicReference<>(STATE.NEW);
+ runner = new AtomicReference<>();
+ this.partitionerDecorator = partitionerDecorator;
this.iface = iface;
- this.part = part;
+ this.partitioner = partitioner;
+ this.count = count;
this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
- // Test only - Pause until interrupted by fragment thread
- try {
- testCountDownLatch.await();
- } catch (final InterruptedException e) {
- logger.debug("Test only: partitioner thread is interrupted in test
countdown latch await()", e);
- }
-
- final Thread currThread = Thread.currentThread();
- final String currThreadName = currThread.getName();
- final OperatorStats localStats = part.getStats();
- try {
- final String newThreadName = parentThreadName + currThread.getId();
- currThread.setName(newThreadName);
+ final Thread thread = Thread.currentThread();
+ if (runner.compareAndSet(null, thread)) {
+ final String name = thread.getName();
+ thread.setName(String.format("Partitioner-%s-%d",
partitionerDecorator.thread.getName(), thread.getId()));
+ final OperatorStats localStats = partitioner.getStats();
localStats.clear();
localStats.startProcessing();
- iface.execute(part);
- } catch (IOException e) {
- exp = e;
- } finally {
- localStats.stopProcessing();
- currThread.setName(currThreadName);
- latch.countDown();
+ ExecutionException executionException = null;
+ try {
+ // Test only - Pause until interrupted by fragment thread
+ testCountDownLatch.await();
+ if (state.get() == STATE.NEW) {
+ iface.execute(partitioner);
+ }
+ } catch (InterruptedException e) {
+ if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
+ logger.warn("Partitioner Task interrupted during the run", e);
+ }
+ } catch (Throwable t) {
+ executionException = new ExecutionException(t);
+ } finally {
+ if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
+ if (executionException == null) {
+ localStats.stopProcessing();
+ state.lazySet(STATE.NORMAL);
+ } else {
+ exception = executionException;
+ state.lazySet(STATE.EXCEPTIONAL);
+ }
+ }
+ if (count.decrementAndGet() == 0) {
+ LockSupport.unpark(partitionerDecorator.thread);
+ }
+ thread.setName(name);
+ while (state.get() == STATE.INTERRUPTING) {
+ Thread.yield();
+ }
+ // Clear interrupt flag
+ Thread.interrupted();
+ }
+ }
+ }
+
+ void cancel(boolean mayInterruptIfRunning) {
+ Preconditions.checkState(Thread.currentThread() ==
partitionerDecorator.thread,
+ String.format("PartitionerTask can be cancelled only from the
main %s thread", partitionerDecorator.thread.getName()));
+ if (runner.compareAndSet(null, partitionerDecorator.thread)) {
+ if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
+ ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
+ }
+ count.decrementAndGet();
--- End diff --
The main PartitionDecorator thread may not get unparked.
1. Two PartitionerTasks are created.
2. Before any of the PartitionerTasks set their runner, the main
Partitioner thread enters await() and cancels each PartitionerTask.
3. The cancel method executes ```
if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
}
count.decrementAndGet();```
For each PartitionerTask.
4. The count goes to zero, but the main PartitionerDecorator thread still
calls Locksupport.park() in await().
5. Unpark wasn't called when the count reached zero so the main
PartitionDecorator thread can remain parked.
---