PHOENIX-1429 Cancel queued threads when limit reached
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/54b443f5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/54b443f5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/54b443f5 Branch: refs/heads/3.0 Commit: 54b443f54743e9d6d93b5e29c2edb987f990f0a4 Parents: 42e0c34 Author: James Taylor <jtay...@salesforce.com> Authored: Wed Nov 12 11:37:25 2014 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Nov 12 22:38:10 2014 -0800 ---------------------------------------------------------------------- .../phoenix/iterate/BaseResultIterators.java | 65 ++++++++++++++++---- .../phoenix/iterate/ConcatResultIterator.java | 38 ++++++++++-- .../phoenix/iterate/LimitingResultIterator.java | 1 + .../iterate/MergeSortResultIterator.java | 27 +++++++- .../apache/phoenix/iterate/ResultIterators.java | 3 +- .../phoenix/query/BaseQueryServicesImpl.java | 6 +- .../phoenix/query/DelegateQueryServices.java | 4 +- .../org/apache/phoenix/query/QueryServices.java | 4 +- .../iterate/AggregateResultScannerTest.java | 4 ++ .../iterate/ConcatResultIteratorTest.java | 15 ++++- .../iterate/MergeSortResultIteratorTest.java | 11 +++- 11 files changed, 147 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index b10a4ab..763daf3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -94,6 +94,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private final PTableStats tableStats; private final byte[] physicalTableName; private final QueryPlan plan; + // TODO: too much nesting here - breakup into new classes. + private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures; + static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { @Override @@ -122,8 +125,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return true; } - public BaseResultIterators(QueryPlan plan, Integer perScanLimit) - throws SQLException { + public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws SQLException { super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint()); this.plan = plan; StatementContext context = plan.getContext(); @@ -178,6 +180,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } this.splits = ImmutableList.copyOf(splitRanges); + // If split detected, this will be more than one, but that's unlikely + this.allFutures = Lists.newArrayListWithExpectedSize(1); } private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { @@ -476,6 +480,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result int numSplits = size(); List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits); final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits); + allFutures.add(futures); + SQLException toThrow = null; // TODO: what purpose does this scanID serve? final UUID scanId = UUID.randomUUID(); try { @@ -507,6 +513,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result addIterator(iterators, concatIterators); concatIterators = Collections.emptyList(); submitWork(scanId, newNestedScans, newFutures, newNestedScans.size()); + allFutures.add(newFutures); for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) { for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) { // Immediate do a get (not catching exception again) and then add the iterators we @@ -525,23 +532,59 @@ public abstract class BaseResultIterators extends ExplainTable implements Result success = true; return iterators; } catch (SQLException e) { - throw e; + toThrow = e; } catch (Exception e) { - throw ServerUtil.parseServerException(e); + toThrow = ServerUtil.parseServerException(e); } finally { - if (!success) { - SQLCloseables.closeAllQuietly(iterators); - // Don't call cancel on already started work, as it causes the HConnection - // to get into a funk. Instead, just cancel queued work. - for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) { - for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) { - futurePair.getSecond().cancel(false); + try { + if (!success) { + try { + close(); + } catch (Exception e) { + if (toThrow == null) { + toThrow = ServerUtil.parseServerException(e); + } else { + toThrow.setNextException(ServerUtil.parseServerException(e)); + } + } finally { + try { + SQLCloseables.closeAll(iterators); + } catch (Exception e) { + if (toThrow == null) { + toThrow = ServerUtil.parseServerException(e); + } else { + toThrow.setNextException(ServerUtil.parseServerException(e)); + } + } } } + } finally { + if (toThrow != null) { + throw toThrow; + } } } + return null; // Not reachable } + + @Override + public void close() throws SQLException { + // Don't call cancel on already started work, as it causes the HConnection + // to get into a funk. Instead, just cancel queued work. + boolean cancelledWork = false; + for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) { + for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) { + for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) { + cancelledWork |= futurePair.getSecond().cancel(false); + } + } + } + if (cancelledWork) { + context.getConnection().getQueryServices().getExecutor().purge(); + } + } + private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) { if (!childIterators.isEmpty()) { parentIterators.add(ConcatResultIterator.newIterator(childIterators)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java index 03f8785..fcc88aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.List; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ServerUtil; /** @@ -53,10 +54,33 @@ public class ConcatResultIterator implements PeekingResultIterator { @Override public void close() throws SQLException { - if (iterators != null) { - for (;index < iterators.size(); index++) { - PeekingResultIterator iterator = iterators.get(index); - iterator.close(); + SQLException toThrow = null; + try { + if (resultIterators != null) { + resultIterators.close(); + } + } catch (Exception e) { + toThrow = ServerUtil.parseServerException(e); + } finally { + try { + if (iterators != null) { + for (;index < iterators.size(); index++) { + PeekingResultIterator iterator = iterators.get(index); + try { + iterator.close(); + } catch (Exception e) { + if (toThrow == null) { + toThrow = ServerUtil.parseServerException(e); + } else { + toThrow.setNextException(ServerUtil.parseServerException(e)); + } + } + } + } + } finally { + if (toThrow != null) { + throw toThrow; + } } } } @@ -90,7 +114,11 @@ public class ConcatResultIterator implements PeekingResultIterator { @Override public Tuple next() throws SQLException { - return currentIterator().next(); + Tuple next = currentIterator().next(); + if (next == null) { + close(); // Close underlying ResultIterators to free resources sooner rather than later + } + return next; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java index f380cf5..e44db10 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java @@ -41,6 +41,7 @@ public class LimitingResultIterator extends DelegateResultIterator { @Override public Tuple next() throws SQLException { if (rowCount++ >= limit) { + close(); // Free resources early return null; } return super.next(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java index 2f5d941..9ef3e70 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java @@ -21,9 +21,9 @@ import java.sql.SQLException; import java.util.List; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.SQLCloseables; +import org.apache.phoenix.util.ServerUtil; /** @@ -52,8 +52,29 @@ public abstract class MergeSortResultIterator implements PeekingResultIterator { @Override public void close() throws SQLException { - if (iterators != null) { - SQLCloseables.closeAll(iterators); + SQLException toThrow = null; + try { + if (resultIterators != null) { + resultIterators.close(); + } + } catch (Exception e) { + toThrow = ServerUtil.parseServerException(e); + } finally { + try { + if (iterators != null) { + SQLCloseables.closeAll(iterators); + } + } catch (Exception e) { + if (toThrow == null) { + toThrow = ServerUtil.parseServerException(e); + } else { + toThrow.setNextException(ServerUtil.parseServerException(e)); + } + } finally { + if (toThrow != null) { + throw toThrow; + } + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java index ef2b534..16f8b41 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java @@ -22,8 +22,9 @@ import java.util.List; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.util.SQLCloseable; -public interface ResultIterators { +public interface ResultIterators extends SQLCloseable { public int size(); public List<KeyRange> getSplits(); public List<List<Scan>> getScans(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java index f116695..73cc3c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java @@ -17,7 +17,7 @@ */ package org.apache.phoenix.query; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.phoenix.job.JobManager; import org.apache.phoenix.memory.GlobalMemoryManager; @@ -35,7 +35,7 @@ import org.apache.phoenix.util.ReadOnlyProps; * @since 0.1 */ public abstract class BaseQueryServicesImpl implements QueryServices { - private final ExecutorService executor; + private final ThreadPoolExecutor executor; private final MemoryManager memoryManager; private final ReadOnlyProps props; private final QueryOptimizer queryOptimizer; @@ -53,7 +53,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices { } @Override - public ExecutorService getExecutor() { + public ThreadPoolExecutor getExecutor() { return executor; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java index 9d9a513..e58aa5f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java @@ -18,7 +18,7 @@ package org.apache.phoenix.query; import java.sql.SQLException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.optimize.QueryOptimizer; @@ -46,7 +46,7 @@ public class DelegateQueryServices implements QueryServices { } @Override - public ExecutorService getExecutor() { + public ThreadPoolExecutor getExecutor() { return parent.getExecutor(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 7ddebaf..31661a0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -17,7 +17,7 @@ */ package org.apache.phoenix.query; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.http.annotation.Immutable; import org.apache.phoenix.iterate.SpoolTooBigToDiskException; @@ -133,7 +133,7 @@ public interface QueryServices extends SQLCloseable { /** * Get executor service used for parallel scans */ - public ExecutorService getExecutor(); + public ThreadPoolExecutor getExecutor(); /** * Get the memory manager used to track memory usage */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java index 246d02d..d759fca 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java @@ -161,6 +161,10 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest { public List<List<Scan>> getScans() { return Collections.emptyList(); } + + @Override + public void close() throws SQLException { + } }; ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java index 02fdcea..cf71724 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java @@ -21,16 +21,19 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.AssertResults; +import org.junit.Test; @@ -78,6 +81,10 @@ public class ConcatResultIteratorTest { public List<List<Scan>> getScans() { return Collections.emptyList(); } + + @Override + public void close() throws SQLException { + } }; Tuple[] expectedResults = new Tuple[] { @@ -137,6 +144,10 @@ public class ConcatResultIteratorTest { public List<List<Scan>> getScans() { return Collections.emptyList(); } + + @Override + public void close() throws SQLException { + } }; ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators); AssertResults.assertResults(scanner, expectedResults); http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java index 095027c..77e42b0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java @@ -21,16 +21,19 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.AssertResults; +import org.junit.Test; public class MergeSortResultIteratorTest { @@ -83,6 +86,10 @@ public class MergeSortResultIteratorTest { public List<List<Scan>> getScans() { return Collections.emptyList(); } + + @Override + public void close() throws SQLException { + } }; ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators); AssertResults.assertResults(scanner, expectedResults);