Fix PartitionSenderRootExec possible memory leak.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/62a73bcd Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/62a73bcd Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/62a73bcd Branch: refs/heads/master Commit: 62a73bcd82464b0a48a234e09040ed069033b848 Parents: aaf9fb8 Author: Jacques Nadeau <[email protected]> Authored: Thu May 14 21:55:44 2015 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 14 22:14:59 2015 -0700 ---------------------------------------------------------------------- .../PartitionSenderRootExec.java | 41 +++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/62a73bcd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 1872a51..31fc160 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -230,25 +230,36 @@ public class PartitionSenderRootExec extends BaseRootExec { final List<Partitioner> subPartitioners = createClassInstances(actualPartitions); int startIndex = 0; int endIndex = 0; - for (int i = 0; i < actualPartitions; i++) { - startIndex = endIndex; - endIndex = (i < actualPartitions - 1 ) ? startIndex + divisor : outGoingBatchCount; - if ( i < longTail ) { - endIndex++; + + boolean success = false; + try { + for (int i = 0; i < actualPartitions; i++) { + startIndex = endIndex; + endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount; + if (i < longTail) { + endIndex++; + } + final OperatorStats partitionStats = new OperatorStats(stats, true); + subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext, + startIndex, endIndex); + } + + synchronized (this) { + partitioner = new PartitionerDecorator(subPartitioners, stats, context); + for (int index = 0; index < terminations.size(); index++) { + partitioner.getOutgoingBatches(terminations.buffer[index]).terminate(); + } + terminations.clear(); } - final OperatorStats partitionStats = new OperatorStats(stats, true); - subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext, - startIndex, endIndex); - } - synchronized(this){ - partitioner = new PartitionerDecorator(subPartitioners, stats, context); - for (int index = 0; index < terminations.size(); index++) { - partitioner.getOutgoingBatches(terminations.buffer[index]).terminate(); + success = true; + } finally { + if (!success) { + for (Partitioner p : subPartitioners) { + p.clear(); + } } - terminations.clear(); } - } private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {
