Repository: drill Updated Branches: refs/heads/master 16ef62851 -> 7f575df33
DRILL-3093: Close raw batch buffers in data collectors Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7f575df3 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7f575df3 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7f575df3 Branch: refs/heads/master Commit: 7f575df33b6cf553c69de011ff46efa69ba0cca4 Parents: 16ef628 Author: Mehant Baid <[email protected]> Authored: Thu May 14 17:55:48 2015 -0700 Committer: Mehant Baid <[email protected]> Committed: Thu May 14 19:20:19 2015 -0700 ---------------------------------------------------------------------- .../org/apache/drill/common/AutoCloseables.java | 18 ++++++++++++++++++ .../impl/mergereceiver/MergingRecordBatch.java | 6 ------ .../unorderedreceiver/UnorderedReceiverBatch.java | 1 - .../exec/record/RawFragmentBatchProvider.java | 3 +-- .../exec/work/batch/AbstractDataCollector.java | 4 +++- .../drill/exec/work/batch/BaseRawBatchBuffer.java | 2 +- .../drill/exec/work/batch/DataCollector.java | 2 +- .../drill/exec/work/batch/IncomingBuffers.java | 7 +++---- .../exec/work/batch/SpoolingRawBatchBuffer.java | 5 +++-- 9 files changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/common/src/main/java/org/apache/drill/common/AutoCloseables.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java index fa1eb92..c080c52 100644 --- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java +++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java @@ -42,4 +42,22 @@ public class AutoCloseables { logger.warn("Failure on close(): " + e); } } + + public static void close(AutoCloseable[] ac) throws Exception { + Exception topLevelException = null; + for (AutoCloseable closeable : ac) { + try { + closeable.close(); + } catch (Exception e) { + if (topLevelException == null) { + topLevelException = e; + } else { + topLevelException.addSuppressed(e); + } + } + } + if (topLevelException != null) { + throw topLevelException; + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 6da132b..baf9bda 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -738,12 +738,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } } - if (fragProviders != null) { - for (final RawFragmentBatchProvider f : fragProviders) { - f.cleanup(); - } - } - super.close(); } http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 1498441..684f715 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -214,7 +214,6 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { @Override public void close() { batchLoader.clear(); - fragProvider.cleanup(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java index 030785c..14db502 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java @@ -21,9 +21,8 @@ import java.io.IOException; import org.apache.drill.exec.ops.FragmentContext; -public interface RawFragmentBatchProvider { +public interface RawFragmentBatchProvider extends AutoCloseable{ public RawFragmentBatch getNext() throws IOException, InterruptedException; public void kill(FragmentContext context); - public void cleanup(); } http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index 407c547..d52cb5d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.Collector; @@ -129,7 +130,8 @@ public abstract class AbstractDataCollector implements DataCollector{ protected abstract RawBatchBuffer getBuffer(int minorFragmentId); @Override - public void close() { + public void close() throws Exception { + AutoCloseables.close(buffers); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java index 5192e46..11b6cc8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java @@ -110,7 +110,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { // ## Add assertion that all acks have been sent. TODO @Override - public void cleanup() { + public void close() { if (!isTerminated() && context.shouldContinue()) { final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount); final IllegalStateException e = new IllegalStateException(msg); http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java index dc016be..de88d02 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java @@ -27,5 +27,5 @@ interface DataCollector extends AutoCloseable { public int getOppositeMajorFragmentId(); public RawBatchBuffer[] getBuffers(); public int getTotalIncomingFragments(); - public void close(); + public void close() throws Exception; } http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java index 1c8b066..b21c61d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.Collector; @@ -110,10 +111,8 @@ public class IncomingBuffers implements AutoCloseable { } @Override - public void close() { - for (DataCollector fragment : fragCounts.values()) { - fragment.close(); - } + public void close() throws Exception { + AutoCloseables.close(fragCounts.values().toArray(new AutoCloseable[0])); } } http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java index 1634982..cfe5b6b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java @@ -240,7 +240,8 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB logger.debug("Got batch. Current buffer size: {}", bufferQueue.size()); } - public void cleanup() { + @Override + public void close() { if (spooler != null) { spooler.terminate(); while (spooler.isAlive()) { @@ -270,7 +271,7 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB logger.warn("Failed to delete temporary files", e); } } - super.cleanup(); + super.close(); } private class Spooler extends Thread {
