Github user ilooner commented on a diff in the pull request:
https://github.com/apache/drill/pull/1208#discussion_r181260542
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
---
@@ -262,68 +279,120 @@ public FlushBatchesHandlingClass(boolean
isLastBatch, boolean schemaChanged) {
}
@Override
- public void execute(Partitioner part) throws IOException {
+ public void execute(Partitioner part) throws IOException,
InterruptedException {
part.flushOutgoingBatches(isLastBatch, schemaChanged);
}
}
/**
- * Helper class to wrap Runnable with customized naming
- * Exception handling
+ * Helper class to wrap Runnable with cancellation and waiting for
completion support
*
*/
- private static class CustomRunnable implements Runnable {
+ private static class PartitionerTask implements Runnable {
+
+ private enum STATE {
+ NEW,
+ COMPLETING,
+ NORMAL,
+ EXCEPTIONAL,
+ CANCELLED,
+ INTERRUPTING,
+ INTERRUPTED
+ }
+
+ private final AtomicReference<STATE> state;
+ private final AtomicReference<Thread> runner;
+ private final PartitionerDecorator partitionerDecorator;
+ private final AtomicInteger count;
- private final String parentThreadName;
- private final CountDownLatch latch;
private final GeneralExecuteIface iface;
- private final Partitioner part;
+ private final Partitioner partitioner;
private CountDownLatchInjection testCountDownLatch;
private volatile IOException exp;
- public CustomRunnable(final String parentThreadName, final
CountDownLatch latch, final GeneralExecuteIface iface,
- final Partitioner part, CountDownLatchInjection
testCountDownLatch) {
- this.parentThreadName = parentThreadName;
- this.latch = latch;
+ public PartitionerTask(PartitionerDecorator partitionerDecorator,
GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count,
CountDownLatchInjection testCountDownLatch) {
+ state = new AtomicReference<>(STATE.NEW);
+ runner = new AtomicReference<>();
+ this.partitionerDecorator = partitionerDecorator;
this.iface = iface;
- this.part = part;
+ this.partitioner = partitioner;
+ this.count = count;
this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
- // Test only - Pause until interrupted by fragment thread
- try {
- testCountDownLatch.await();
- } catch (final InterruptedException e) {
- logger.debug("Test only: partitioner thread is interrupted in test
countdown latch await()", e);
- }
-
- final Thread currThread = Thread.currentThread();
- final String currThreadName = currThread.getName();
- final OperatorStats localStats = part.getStats();
- try {
- final String newThreadName = parentThreadName + currThread.getId();
- currThread.setName(newThreadName);
+ final Thread thread = Thread.currentThread();
+ if (state.get() == STATE.NEW && runner.compareAndSet(null, thread)) {
--- End diff --
Why is this check necessary?
---