Github user ilooner commented on a diff in the pull request:
https://github.com/apache/drill/pull/1208#discussion_r181851002
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
---
@@ -262,68 +280,124 @@ public FlushBatchesHandlingClass(boolean
isLastBatch, boolean schemaChanged) {
}
@Override
- public void execute(Partitioner part) throws IOException {
+ public void execute(Partitioner part) throws IOException,
InterruptedException {
part.flushOutgoingBatches(isLastBatch, schemaChanged);
}
}
/**
- * Helper class to wrap Runnable with customized naming
- * Exception handling
+ * Helper class to wrap Runnable with cancellation and waiting for
completion support
*
*/
- private static class CustomRunnable implements Runnable {
+ private static class PartitionerTask implements Runnable {
+
+ private enum STATE {
+ NEW,
+ COMPLETING,
+ NORMAL,
+ EXCEPTIONAL,
+ CANCELLED,
+ INTERRUPTING,
+ INTERRUPTED
+ }
+
+ private final AtomicReference<STATE> state;
+ private final AtomicReference<Thread> runner;
+ private final PartitionerDecorator partitionerDecorator;
+ private final AtomicInteger count;
- private final String parentThreadName;
- private final CountDownLatch latch;
private final GeneralExecuteIface iface;
- private final Partitioner part;
+ private final Partitioner partitioner;
private CountDownLatchInjection testCountDownLatch;
- private volatile IOException exp;
+ private volatile ExecutionException exception;
- public CustomRunnable(final String parentThreadName, final
CountDownLatch latch, final GeneralExecuteIface iface,
- final Partitioner part, CountDownLatchInjection
testCountDownLatch) {
- this.parentThreadName = parentThreadName;
- this.latch = latch;
+ public PartitionerTask(PartitionerDecorator partitionerDecorator,
GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count,
CountDownLatchInjection testCountDownLatch) {
+ state = new AtomicReference<>(STATE.NEW);
+ runner = new AtomicReference<>();
+ this.partitionerDecorator = partitionerDecorator;
this.iface = iface;
- this.part = part;
+ this.partitioner = partitioner;
+ this.count = count;
this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
- // Test only - Pause until interrupted by fragment thread
+ final Thread thread = Thread.currentThread();
+ Preconditions.checkState(runner.compareAndSet(null, thread),
+ "PartitionerTask can be executed only once.");
+ if (state.get() == STATE.NEW) {
+ final String name = thread.getName();
+ thread.setName(String.format("Partitioner-%s-%d",
partitionerDecorator.thread.getName(), thread.getId()));
+ final OperatorStats localStats = partitioner.getStats();
+ localStats.clear();
+ localStats.startProcessing();
+ ExecutionException executionException = null;
+ try {
+ // Test only - Pause until interrupted by fragment thread
+ testCountDownLatch.await();
+ iface.execute(partitioner);
+ } catch (InterruptedException e) {
+ if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
+ logger.warn("Partitioner Task interrupted during the run", e);
+ }
+ } catch (Throwable t) {
+ executionException = new ExecutionException(t);
+ }
+ if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
+ if (executionException == null) {
+ localStats.stopProcessing();
+ state.lazySet(STATE.NORMAL);
+ } else {
+ exception = executionException;
+ state.lazySet(STATE.EXCEPTIONAL);
+ }
+ }
+ if (count.decrementAndGet() == 0) {
+ LockSupport.unpark(partitionerDecorator.thread);
+ }
+ thread.setName(name);
+ }
+ runner.set(null);
+ while (state.get() == STATE.INTERRUPTING) {
+ Thread.yield();
+ }
+ // Clear interrupt flag
try {
- testCountDownLatch.await();
- } catch (final InterruptedException e) {
- logger.debug("Test only: partitioner thread is interrupted in test
countdown latch await()", e);
+ Thread.sleep(0);
--- End diff --
Could we use Thread.interrupted() instead? Javadoc suggests it's a good
alternative to use for clearing the interrupt flag. Also it avoids an
unnecessary yield on some JVM implementations.
```
/**
* Tests whether the current thread has been interrupted. The
* <i>interrupted status</i> of the thread is cleared by this method.
In
* other words, if this method were to be called twice in succession,
the
* second call would return false (unless the current thread were
* interrupted again, after the first call had cleared its interrupted
* status and before the second call had examined it).
*
* <p>A thread interruption ignored because a thread was not alive
* at the time of the interrupt will be reflected by this method
* returning false.
*
* @return <code>true</code> if the current thread has been
interrupted;
* <code>false</code> otherwise.
* @see #isInterrupted()
* @revised 6.0
*/
```
---