This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 0a3ebc24d941f75841dcb2a6251e22d2a988c46c Author: weijie.tong <[email protected]> AuthorDate: Tue Oct 9 20:28:22 2018 +0800 DRILL-6731: use thread pool to run the runtime filter aggregating work closes #1459 --- .../org/apache/drill/exec/ops/FragmentContextImpl.java | 10 ++++++---- .../apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 ++++++++-------- .../test/java/org/apache/drill/test/OperatorFixture.java | 3 ++- .../org/apache/drill/test/PhysicalOpUnitTestBase.java | 2 +- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index 1f9d489..fcfdc8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -209,7 +209,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor stats = new FragmentStats(allocator, fragment.getAssignment()); bufferManager = new BufferManagerImpl(this.allocator); constantValueHolderCache = Maps.newHashMap(); - this.runtimeFilterSink = new RuntimeFilterSink(this.allocator); + boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER); + if (enableRF) { + ExecutorService executorService = context.getExecutor(); + this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService); + } } /** @@ -472,9 +476,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor for (OperatorContextImpl opContext : contexts) { suppressingClose(opContext); } - if (runtimeFilterSink != null) { - suppressingClose(runtimeFilterSink); - } + suppressingClose(runtimeFilterSink); suppressingClose(bufferManager); suppressingClose(allocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java index 754c68e..1468625 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -18,10 +18,11 @@ package org.apache.drill.exec.work.filter; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.rpc.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -63,18 +64,17 @@ public class RuntimeFilterSink implements AutoCloseable { private ReentrantLock aggregatedRFLock = new ReentrantLock(); - private Thread asyncAggregateThread; - private BufferAllocator bufferAllocator; + private Future future; + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator) { + public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { this.bufferAllocator = bufferAllocator; AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); - asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker); - asyncAggregateThread.start(); + future = executorService.submit(asyncAggregateWorker); } public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { @@ -158,7 +158,7 @@ public class RuntimeFilterSink implements AutoCloseable { @Override public void close() throws Exception { - asyncAggregateThread.interrupt(); + future.cancel(true); doCleanup(); } @@ -209,7 +209,7 @@ public class RuntimeFilterSink implements AutoCloseable { currentBookId.incrementAndGet(); } } catch (InterruptedException e) { - logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e); + logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName()); Thread.currentThread().interrupt(); } finally { doCleanup(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java index 81d0d1a..a1e7d0d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java @@ -81,6 +81,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Test fixture for operator and (especially) "sub-operator" tests. @@ -197,7 +198,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { this.controls = new ExecutionControls(options); compiler = new CodeCompiler(config, options); bufferManager = new BufferManagerImpl(allocator); - this.runtimeFilterSink = new RuntimeFilterSink(allocator); + this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool()); } private static FunctionImplementationRegistry newFunctionRegistry( diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java index 559f7f4..300e88b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java @@ -209,7 +209,7 @@ public class PhysicalOpUnitTestBase extends ExecTest { public MockExecutorFragmentContext(final FragmentContext fragmentContext) { super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(), fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor()); - this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator()); + this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool()); } @Override
