Repository: phoenix Updated Branches: refs/heads/4.2 e9a8f83c0 -> adc859f8f
PHOENIX-1448 Fix resource leak when work rejected by thread executor Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/adc859f8 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/adc859f8 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/adc859f8 Branch: refs/heads/4.2 Commit: adc859f8f1116ea634263c99dbf7fb30ff128fd9 Parents: e9a8f83 Author: James Taylor <[email protected]> Authored: Wed Nov 12 18:59:25 2014 -0800 Committer: James Taylor <[email protected]> Committed: Wed Nov 12 19:01:46 2014 -0800 ---------------------------------------------------------------------- .../phoenix/iterate/BaseResultIterators.java | 22 +++++++++++++------- .../phoenix/iterate/ParallelIterators.java | 6 ++++-- .../apache/phoenix/iterate/SerialIterators.java | 6 ++++-- 3 files changed, 22 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/adc859f8/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 7785c54..c873494 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -504,13 +504,17 @@ public abstract class BaseResultIterators extends ExplainTable implements Result boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL; final ConnectionQueryServices services = context.getConnection().getQueryServices(); ReadOnlyProps props = services.getProps(); - int numSplits = size(); - List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits); - final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits); + int numScans = size(); + // Capture all iterators so that if something goes wrong, we close them all + // The iterators list is based on the submission of work, so it may not + // contain them all (for example if work was rejected from the queue) + List<PeekingResultIterator> allIterators = Lists.newArrayListWithExpectedSize(this.splits.size()); + List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans); + final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans); allFutures.add(futures); SQLException toThrow = null; try { - submitWork(scans, futures, splits.size()); + submitWork(scans, futures, allIterators, splits.size()); int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS); boolean clearedCache = false; for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) { @@ -540,7 +544,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // as we need these to be in order addIterator(iterators, concatIterators); concatIterators = Collections.emptyList(); - submitWork(newNestedScans, newFutures, newNestedScans.size()); + submitWork(newNestedScans, newFutures, allIterators, newNestedScans.size()); allFutures.add(newFutures); for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) { for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) { @@ -576,7 +580,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } finally { try { - SQLCloseables.closeAll(iterators); + SQLCloseables.closeAll(allIterators); } catch (Exception e) { if (toThrow == null) { toThrow = ServerUtil.parseServerException(e); @@ -604,7 +608,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) { for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) { for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) { - if (futurePair != null) { // FIXME: null check should not be necessary + // When work is rejected, we may have null futurePair entries, because + // we randomize these and set them as they're submitted. + if (futurePair != null) { Future<PeekingResultIterator> future = futurePair.getSecond(); if (future != null) { cancelledWork |= future.cancel(false); @@ -648,7 +654,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result abstract protected String getName(); abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - int estFlattenedSize); + List<PeekingResultIterator> allIterators, int estFlattenedSize); @Override public int size() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/adc859f8/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index d16160c..62af19a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -57,7 +57,7 @@ public class ParallelIterators extends BaseResultIterators { @Override protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - int estFlattenedSize) { + final List<PeekingResultIterator> allIterators, int estFlattenedSize) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -89,7 +89,9 @@ public class ParallelIterators extends BaseResultIterators { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); } - return iteratorFactory.newIterator(context, scanner, scan); + PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan); + allIterators.add(iterator); + return iterator; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/adc859f8/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 502cdf8..4be7b56 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -61,7 +61,7 @@ public class SerialIterators extends BaseResultIterators { @Override protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - int estFlattenedSize) { + final List<PeekingResultIterator> allIterators, int estFlattenedSize) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -88,7 +88,9 @@ public class SerialIterators extends BaseResultIterators { concatIterators.add(iteratorFactory.newIterator(context, scanner, scan)); } PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators); - return new LimitingPeekingResultIterator(concatIterator, limit); + PeekingResultIterator iterator = new LimitingPeekingResultIterator(concatIterator, limit); + allIterators.add(iterator); + return iterator; } /**
