PHOENIX-1429 Cancel queued threads when limit reached

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/54b443f5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/54b443f5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/54b443f5

Branch: refs/heads/3.0
Commit: 54b443f54743e9d6d93b5e29c2edb987f990f0a4
Parents: 42e0c34
Author: James Taylor <jtay...@salesforce.com>
Authored: Wed Nov 12 11:37:25 2014 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Wed Nov 12 22:38:10 2014 -0800

----------------------------------------------------------------------
 .../phoenix/iterate/BaseResultIterators.java    | 65 ++++++++++++++++----
 .../phoenix/iterate/ConcatResultIterator.java   | 38 ++++++++++--
 .../phoenix/iterate/LimitingResultIterator.java |  1 +
 .../iterate/MergeSortResultIterator.java        | 27 +++++++-
 .../apache/phoenix/iterate/ResultIterators.java |  3 +-
 .../phoenix/query/BaseQueryServicesImpl.java    |  6 +-
 .../phoenix/query/DelegateQueryServices.java    |  4 +-
 .../org/apache/phoenix/query/QueryServices.java |  4 +-
 .../iterate/AggregateResultScannerTest.java     |  4 ++
 .../iterate/ConcatResultIteratorTest.java       | 15 ++++-
 .../iterate/MergeSortResultIteratorTest.java    | 11 +++-
 11 files changed, 147 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index b10a4ab..763daf3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -94,6 +94,9 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     private final PTableStats tableStats;
     private final byte[] physicalTableName;
     private final QueryPlan plan;
+    // TODO: too much nesting here - breakup into new classes.
+    private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> 
allFutures;
+
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new 
Function<HRegionLocation, KeyRange>() {
         @Override
@@ -122,8 +125,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         return true;
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit)
-            throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws 
SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), 
plan.getOrderBy(), plan.getStatement().getHint());
         this.plan = plan;
         StatementContext context = plan.getContext();
@@ -178,6 +180,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             }
         }
         this.splits = ImmutableList.copyOf(splitRanges);
+        // If split detected, this will be more than one, but that's unlikely
+        this.allFutures = Lists.newArrayListWithExpectedSize(1);
     }
 
     private void doColumnProjectionOptimization(StatementContext context, Scan 
scan, PTable table, FilterableStatement statement) {
@@ -476,6 +480,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         int numSplits = size();
         List<PeekingResultIterator> iterators = new 
ArrayList<PeekingResultIterator>(numSplits);
         final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = 
Lists.newArrayListWithExpectedSize(numSplits);
+        allFutures.add(futures);
+        SQLException toThrow = null;
         // TODO: what purpose does this scanID serve?
         final UUID scanId = UUID.randomUUID();
         try {
@@ -507,6 +513,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                             addIterator(iterators, concatIterators);
                             concatIterators = Collections.emptyList();
                             submitWork(scanId, newNestedScans, newFutures, 
newNestedScans.size());
+                            allFutures.add(newFutures);
                             for 
(List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : 
reverseIfNecessary(newFutures, isReverse)) {
                                 for (Pair<Scan,Future<PeekingResultIterator>> 
newScanPair : reverseIfNecessary(newFuture, isReverse)) {
                                     // Immediate do a get (not catching 
exception again) and then add the iterators we
@@ -525,23 +532,59 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             success = true;
             return iterators;
         } catch (SQLException e) {
-            throw e;
+            toThrow = e;
         } catch (Exception e) {
-            throw ServerUtil.parseServerException(e);
+            toThrow = ServerUtil.parseServerException(e);
         } finally {
-            if (!success) {
-                SQLCloseables.closeAllQuietly(iterators);
-                // Don't call cancel on already started work, as it causes the 
HConnection
-                // to get into a funk. Instead, just cancel queued work.
-                for (List<Pair<Scan,Future<PeekingResultIterator>>> 
futureScans : futures) {
-                    for (Pair<Scan,Future<PeekingResultIterator>> futurePair : 
futureScans) {
-                        futurePair.getSecond().cancel(false);
+            try {
+                if (!success) {
+                    try {
+                        close();
+                    } catch (Exception e) {
+                        if (toThrow == null) {
+                            toThrow = ServerUtil.parseServerException(e);
+                        } else {
+                            
toThrow.setNextException(ServerUtil.parseServerException(e));
+                        }
+                    } finally {
+                        try {
+                            SQLCloseables.closeAll(iterators);
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                
toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
+                        }
                     }
                 }
+            } finally {
+                if (toThrow != null) {
+                    throw toThrow;
+                }
             }
         }
+        return null; // Not reachable
     }
     
+
+    @Override
+    public void close() throws SQLException {
+        // Don't call cancel on already started work, as it causes the 
HConnection
+        // to get into a funk. Instead, just cancel queued work.
+        boolean cancelledWork = false;
+        for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : 
allFutures) {
+            for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : 
futures) {
+                for (Pair<Scan,Future<PeekingResultIterator>> futurePair : 
futureScans) {
+                    cancelledWork |= futurePair.getSecond().cancel(false);
+                }
+            }
+        }
+        if (cancelledWork) {
+            context.getConnection().getQueryServices().getExecutor().purge();
+        }
+    }
+
     private void addIterator(List<PeekingResultIterator> parentIterators, 
List<PeekingResultIterator> childIterators) {
         if (!childIterators.isEmpty()) {
             
parentIterators.add(ConcatResultIterator.newIterator(childIterators));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index 03f8785..fcc88aa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
 
 
 /**
@@ -53,10 +54,33 @@ public class ConcatResultIterator implements 
PeekingResultIterator {
     
     @Override
     public void close() throws SQLException {
-        if (iterators != null) {
-            for (;index < iterators.size(); index++) {
-                PeekingResultIterator iterator = iterators.get(index);
-                iterator.close();
+        SQLException toThrow = null;
+        try {
+            if (resultIterators != null) {
+                resultIterators.close();
+            }
+        } catch (Exception e) {
+           toThrow = ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (iterators != null) {
+                    for (;index < iterators.size(); index++) {
+                        PeekingResultIterator iterator = iterators.get(index);
+                        try {
+                            iterator.close();
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                
toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
+                        }
+                    }
+                }
+            } finally {
+                if (toThrow != null) {
+                    throw toThrow;
+                }
             }
         }
     }
@@ -90,7 +114,11 @@ public class ConcatResultIterator implements 
PeekingResultIterator {
 
     @Override
     public Tuple next() throws SQLException {
-        return currentIterator().next();
+        Tuple next = currentIterator().next();
+        if (next == null) {
+            close(); // Close underlying ResultIterators to free resources 
sooner rather than later
+        }
+        return next;
     }
 
        @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
index f380cf5..e44db10 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
@@ -41,6 +41,7 @@ public class LimitingResultIterator extends 
DelegateResultIterator {
     @Override
     public Tuple next() throws SQLException {
         if (rowCount++ >= limit) {
+            close(); // Free resources early
             return null;
         }
         return super.next();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
index 2f5d941..9ef3e70 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -21,9 +21,9 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ServerUtil;
 
 
 /**
@@ -52,8 +52,29 @@ public abstract class MergeSortResultIterator implements 
PeekingResultIterator {
     
     @Override
     public void close() throws SQLException {
-        if (iterators != null) {
-            SQLCloseables.closeAll(iterators);
+        SQLException toThrow = null;
+        try {
+            if (resultIterators != null) {
+                resultIterators.close();
+            }
+        } catch (Exception e) {
+           toThrow = ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (iterators != null) {
+                    SQLCloseables.closeAll(iterators);
+                }
+            } catch (Exception e) {
+                if (toThrow == null) {
+                    toThrow = ServerUtil.parseServerException(e);
+                } else {
+                    
toThrow.setNextException(ServerUtil.parseServerException(e));
+                }
+            } finally {
+                if (toThrow != null) {
+                    throw toThrow;
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
index ef2b534..16f8b41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
@@ -22,8 +22,9 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.SQLCloseable;
 
-public interface ResultIterators {
+public interface ResultIterators extends SQLCloseable {
     public int size();
     public List<KeyRange> getSplits();
     public List<List<Scan>> getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index f116695..73cc3c2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.query;
 
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.phoenix.job.JobManager;
 import org.apache.phoenix.memory.GlobalMemoryManager;
@@ -35,7 +35,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
  * @since 0.1
  */
 public abstract class BaseQueryServicesImpl implements QueryServices {
-    private final ExecutorService executor;
+    private final ThreadPoolExecutor executor;
     private final MemoryManager memoryManager;
     private final ReadOnlyProps props;
     private final QueryOptimizer queryOptimizer;
@@ -53,7 +53,7 @@ public abstract class BaseQueryServicesImpl implements 
QueryServices {
     }
     
     @Override
-    public ExecutorService getExecutor() {
+    public ThreadPoolExecutor getExecutor() {
         return executor;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
index 9d9a513..e58aa5f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
@@ -18,7 +18,7 @@
 package org.apache.phoenix.query;
 
 import java.sql.SQLException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.optimize.QueryOptimizer;
@@ -46,7 +46,7 @@ public class DelegateQueryServices implements QueryServices {
     }
     
     @Override
-    public ExecutorService getExecutor() {
+    public ThreadPoolExecutor getExecutor() {
         return parent.getExecutor();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 7ddebaf..31661a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.query;
 
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.http.annotation.Immutable;
 import org.apache.phoenix.iterate.SpoolTooBigToDiskException;
@@ -133,7 +133,7 @@ public interface QueryServices extends SQLCloseable {
     /**
      * Get executor service used for parallel scans
      */
-    public ExecutorService getExecutor();
+    public ThreadPoolExecutor getExecutor();
     /**
      * Get the memory manager used to track memory usage
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index 246d02d..d759fca 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -161,6 +161,10 @@ public class AggregateResultScannerTest extends 
BaseConnectionlessQueryTest {
                        public List<List<Scan>> getScans() {
                                return Collections.emptyList();
                        }
+
+            @Override
+            public void close() throws SQLException {
+            }
             
         };
         ResultIterator scanner = new GroupedAggregatingResultIterator(new 
MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index 02fdcea..cf71724 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -21,16 +21,19 @@ import static 
org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
+import org.junit.Test;
 
 
 
@@ -78,6 +81,10 @@ public class ConcatResultIteratorTest {
                        public List<List<Scan>> getScans() {
                                return Collections.emptyList();
                        }
+
+            @Override
+            public void close() throws SQLException {
+            }
         };
 
         Tuple[] expectedResults = new Tuple[] {
@@ -137,6 +144,10 @@ public class ConcatResultIteratorTest {
                        public List<List<Scan>> getScans() {
                                return Collections.emptyList();
                        }
+
+            @Override
+            public void close() throws SQLException {
+            }
         };
         ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
         AssertResults.assertResults(scanner, expectedResults);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54b443f5/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
index 095027c..77e42b0 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
@@ -21,16 +21,19 @@ import static 
org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
+import org.junit.Test;
 
 
 public class MergeSortResultIteratorTest {
@@ -83,6 +86,10 @@ public class MergeSortResultIteratorTest {
                        public List<List<Scan>> getScans() {
                                return Collections.emptyList();
                        }
+
+            @Override
+            public void close() throws SQLException {
+            }
         };
         ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
         AssertResults.assertResults(scanner, expectedResults);

Reply via email to