Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 a0cab3bda -> d1bba5a28
PHOENIX-1894 Iterators in BaseResultIterators#submitWork should be added to a thread safe collection. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d1bba5a2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d1bba5a2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d1bba5a2 Branch: refs/heads/4.x-HBase-0.98 Commit: d1bba5a287841e339f400b909590b5ba9828fae4 Parents: a0cab3b Author: Samarth <[email protected]> Authored: Mon Apr 20 16:29:09 2015 -0700 Committer: Samarth <[email protected]> Committed: Mon Apr 20 16:29:09 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/phoenix/iterate/BaseResultIterators.java | 6 ++++-- .../java/org/apache/phoenix/iterate/ParallelIterators.java | 3 ++- .../main/java/org/apache/phoenix/iterate/SerialIterators.java | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 8d602b5..6a3847b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -29,10 +29,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; +import java.util.Queue; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -513,7 +515,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // Capture all iterators so that if something goes wrong, we close them all // The iterators list is based on the submission of work, so it may not // contain them all (for example if work was rejected from the queue) - List<PeekingResultIterator> allIterators = Lists.newArrayListWithExpectedSize(this.splits.size()); + Queue<PeekingResultIterator> allIterators = new ConcurrentLinkedQueue<>(); List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans); final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans); allFutures.add(futures); @@ -680,7 +682,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result abstract protected String getName(); abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - List<PeekingResultIterator> allIterators, int estFlattenedSize); + Queue<PeekingResultIterator> allIterators, int estFlattenedSize); @Override public int size() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index b74919b..97270ef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_S import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -57,7 +58,7 @@ public class ParallelIterators extends BaseResultIterators { @Override protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final List<PeekingResultIterator> allIterators, int estFlattenedSize) { + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index ded9344..6b3b5e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -20,6 +20,7 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -60,7 +61,7 @@ public class SerialIterators extends BaseResultIterators { @Override protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final List<PeekingResultIterator> allIterators, int estFlattenedSize) { + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor
