This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit b557b796dc1ca7796b0db956e39df1d52f212f12 Author: Sorabh Hamirwasia <[email protected]> AuthorDate: Wed Jan 23 17:38:44 2019 -0800 DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete closes #1621 --- .../apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java index f69a44e..c0eceae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -99,6 +99,10 @@ public class RuntimeFilterSink implements Closeable joinMjId2Stopwatch.put(joinMjId, stopwatch); } synchronized (rfQueue) { + if (!running.get()) { + runtimeFilterWritable.close(); + return; + } rfQueue.add(runtimeFilterWritable); rfQueue.notify(); } @@ -246,14 +250,22 @@ public class RuntimeFilterSink implements Closeable aggregate(toAggregate); } catch (Exception ex) { logger.error("Failed to aggregate or route the RFW", ex); + + // Set running to false and cleanup pending RFW in queue. This will make sure producer + // thread is also indicated to stop and queue is cleaned up properly in failure cases + synchronized (rfQueue) { + running.set(false); + } + cleanupQueue(); throw new DrillRuntimeException(ex); } finally { - if (toAggregate != null) { toAggregate.close(); - } } } + cleanupQueue(); + } + private void cleanupQueue() { if (!running.get()) { RuntimeFilterWritable toClose; while ((toClose = rfQueue.poll()) != null) {
