Repository: phoenix Updated Branches: refs/heads/3.0 4b511a0c4 -> 0ced64677
PHOENIX-1186 Pass scan for parallel chunk of work through to ParallelIteratorFactory Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0ced6467 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0ced6467 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0ced6467 Branch: refs/heads/3.0 Commit: 0ced6467755bea8fbba95d51e5bc229f5b3177dd Parents: 4b511a0 Author: James Taylor <jamestay...@apache.org> Authored: Tue Aug 19 16:19:07 2014 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Tue Aug 19 16:36:52 2014 -0700 ---------------------------------------------------------------------- .../MutatingParallelIteratorFactory.java | 3 ++- .../apache/phoenix/execute/AggregatePlan.java | 9 +++++---- .../phoenix/iterate/ChunkedResultIterator.java | 21 ++++++++------------ .../phoenix/iterate/ParallelIterators.java | 4 ++-- .../phoenix/iterate/SpoolingResultIterator.java | 3 ++- 5 files changed, 19 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/0ced6467/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index fbfce29..df91b1d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import java.util.List; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; import org.apache.phoenix.iterate.PeekingResultIterator; @@ -58,7 +59,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato abstract protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException; @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator, Scan scan) throws SQLException { final PhoenixConnection connection = new PhoenixConnection(this.connection); MutationState state = mutate(context, iterator, connection); long totalRowCount = state.getUpdateCount(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/0ced6467/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 67c7bb7..d45e036 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -22,6 +22,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; @@ -94,7 +95,7 @@ public class AggregatePlan extends BasicQueryPlan { this.services = services; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { Expression expression = RowKeyExpression.INSTANCE; OrderByExpression orderByExpression = new OrderByExpression(expression, false, true); int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); @@ -111,9 +112,9 @@ public class AggregatePlan extends BasicQueryPlan { this.outerFactory = outerFactory; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { - PeekingResultIterator iterator = innerFactory.newIterator(context, scanner); - return outerFactory.newIterator(context, iterator); + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { + PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan); + return outerFactory.newIterator(context, iterator, scan); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0ced6467/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index cfaca84..38e91bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -18,7 +18,6 @@ package org.apache.phoenix.iterate; -import java.io.IOException; import java.sql.SQLException; import java.util.List; @@ -26,11 +25,11 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ScanUtil; /** * {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for @@ -58,9 +57,9 @@ public class ChunkedResultIterator implements PeekingResultIterator { } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { scanner.close(); //close the iterator since we don't need it anymore. - return new ChunkedResultIterator(delegateFactory, context, tableRef, + return new ChunkedResultIterator(delegateFactory, context, tableRef, scan, context.getConnection().getQueryServices().getProps().getLong( QueryServices.SCAN_RESULT_CHUNK_SIZE, QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE)); @@ -68,11 +67,11 @@ public class ChunkedResultIterator implements PeekingResultIterator { } public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory, - StatementContext context, TableRef tableRef, long chunkSize) { + StatementContext context, TableRef tableRef, Scan scan, long chunkSize) { this.delegateIteratorFactory = delegateIteratorFactory; this.context = context; this.tableRef = tableRef; - this.scan = context.getScan(); + this.scan = scan; this.chunkSize = chunkSize; } @@ -105,18 +104,14 @@ public class ChunkedResultIterator implements PeekingResultIterator { if (resultIterator == null) { singleChunkResultIterator = new SingleChunkResultIterator( new TableResultIterator(context, tableRef, scan), chunkSize); - resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); } else if (resultIterator.peek() == null && !singleChunkResultIterator.isEndOfStreamReached()) { singleChunkResultIterator.close(); - try { - this.scan = new Scan(scan); - } catch (IOException e) { - throw new PhoenixIOException(e); - } + scan = ScanUtil.newScan(scan); scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), new byte[]{0})); singleChunkResultIterator = new SingleChunkResultIterator( new TableResultIterator(context, tableRef, scan), chunkSize); - resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); } return resultIterator; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0ced6467/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 989a7be..440b47c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -85,7 +85,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { private final ParallelIteratorFactory iteratorFactory; public static interface ParallelIteratorFactory { - PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException; + PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException; } private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min @@ -353,7 +353,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { if (logger.isDebugEnabled()) { logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan); } - return iteratorFactory.newIterator(scanContext, scanner); + return iteratorFactory.newIterator(scanContext, scanner, splitScan); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/0ced6467/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 49b1468..22cd049 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -29,6 +29,7 @@ import java.util.List; import org.apache.commons.io.output.DeferredFileOutputStream; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.StatementContext; @@ -63,7 +64,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { this.services = services; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { return new SpoolingResultIterator(scanner, services); }