Repository: phoenix
Updated Branches:
  refs/heads/master 5565aa38d -> e3a1c24d2


PHOENIX-1894 Iterators in BaseResultIterators#submitWork should be added to a 
thread safe collection.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e3a1c24d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e3a1c24d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e3a1c24d

Branch: refs/heads/master
Commit: e3a1c24d2712f7976b3f3e9c003f1a27dfe62304
Parents: 5565aa3
Author: Samarth <[email protected]>
Authored: Mon Apr 20 16:31:21 2015 -0700
Committer: Samarth <[email protected]>
Committed: Mon Apr 20 16:31:21 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/phoenix/iterate/BaseResultIterators.java   | 6 ++++--
 .../java/org/apache/phoenix/iterate/ParallelIterators.java     | 3 ++-
 .../main/java/org/apache/phoenix/iterate/SerialIterators.java  | 3 ++-
 3 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e3a1c24d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 8d602b5..6a3847b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -29,10 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -513,7 +515,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         // Capture all iterators so that if something goes wrong, we close 
them all
         // The iterators list is based on the submission of work, so it may not
         // contain them all (for example if work was rejected from the queue)
-        List<PeekingResultIterator> allIterators = 
Lists.newArrayListWithExpectedSize(this.splits.size());
+        Queue<PeekingResultIterator> allIterators = new 
ConcurrentLinkedQueue<>();
         List<PeekingResultIterator> iterators = new 
ArrayList<PeekingResultIterator>(numScans);
         final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = 
Lists.newArrayListWithExpectedSize(numScans);
         allFutures.add(futures);
@@ -680,7 +682,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            List<PeekingResultIterator> allIterators, int estFlattenedSize);
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize);
     
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e3a1c24d/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index b74919b..97270ef 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -22,6 +22,7 @@ import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_S
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -57,7 +58,7 @@ public class ParallelIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final List<PeekingResultIterator> allIterators, int 
estFlattenedSize) {
+            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e3a1c24d/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index ded9344..6b3b5e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -60,7 +61,7 @@ public class SerialIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final List<PeekingResultIterator> allIterators, int 
estFlattenedSize) {
+            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor

Reply via email to