PHOENIX-3941 Filter regions to scan for local indexes based on data table 
leading pk filter conditions


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8c1746c2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8c1746c2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8c1746c2

Branch: refs/heads/5.x-HBase-2.0
Commit: 8c1746c211edc5df62f3d3c62b797a9ba4e97e24
Parents: 05959b1
Author: James Taylor <jtay...@salesforce.com>
Authored: Wed Feb 7 23:02:44 2018 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Fri Feb 9 12:30:56 2018 -0800

----------------------------------------------------------------------
 .../apache/phoenix/compile/DeleteCompiler.java  |   2 +-
 .../org/apache/phoenix/compile/ExplainPlan.java |  10 +
 .../apache/phoenix/compile/JoinCompiler.java    |  10 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |   2 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  20 +-
 .../org/apache/phoenix/compile/ScanRanges.java  |  12 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   4 +-
 .../apache/phoenix/execute/AggregatePlan.java   |  12 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |   4 +-
 .../execute/LiteralResultIterationPlan.java     |   2 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  18 +-
 .../phoenix/iterate/BaseResultIterators.java    | 226 ++++++++++++++++++-
 .../apache/phoenix/iterate/ExplainTable.java    |   3 +-
 .../phoenix/iterate/ParallelIterators.java      |   8 +-
 .../apache/phoenix/iterate/SerialIterators.java |   4 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   2 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |   4 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   8 +-
 .../phoenix/compile/QueryCompilerTest.java      | 226 +++++++++++++++++++
 .../apache/phoenix/query/KeyRangeClipTest.java  | 155 +++++++++++++
 .../query/ParallelIteratorsSplitTest.java       |   2 +-
 21 files changed, 674 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 6383ed0..b77fcbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -578,7 +578,7 @@ public class DeleteCompiler {
             }
             final RowProjector projector = projectorToBe;
             final QueryPlan aggPlan = new AggregatePlan(context, select, 
dataPlan.getTableRef(), projector, null, null,
-                    OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, 
null);
+                    OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, 
null, dataPlan);
             return new ServerSelectDeleteMutationPlan(dataPlan, connection, 
aggPlan, projector, maxSize, maxSizeBytes);
         } else {
             final DeletingParallelIteratorFactory parallelIteratorFactory = 
parallelIteratorFactoryToBe;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
index 2bc7809..ef34daa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
@@ -34,4 +34,14 @@ public class ExplainPlan {
     public List<String> getPlanSteps() {
         return planSteps;
     }
+    
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        for (String step : planSteps) {
+            buf.append(step);
+            buf.append('\n');
+        }
+        return buf.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 887e2d2..f9d8711 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -17,8 +17,8 @@
  */
 package org.apache.phoenix.compile;
 
-import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -80,8 +80,6 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ProjectedColumn;
@@ -1175,7 +1173,7 @@ public class JoinCompiler {
         }
         JoinTable join = compile(statement, select, resolver);
         if (groupByTableRef != null || orderByTableRef != null) {
-            QueryCompiler compiler = new QueryCompiler(statement, select, 
resolver, false);
+            QueryCompiler compiler = new QueryCompiler(statement, select, 
resolver, false, null);
             List<Object> binds = statement.getParameters();
             StatementContext ctx = new StatementContext(statement, resolver, 
new Scan(), new SequenceManager(statement));
             QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, 
false, false, null);
@@ -1197,6 +1195,10 @@ public class JoinCompiler {
             List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? 
select.getGroupBy() : null;
             List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? 
select.getOrderBy() : null;
             SelectStatement stmt = 
getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), 
table.getTableSamplingRate(), tableRef, join.getColumnRefs(), 
table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), 
select.hasSequence(), select.getUdfParseNodes());
+            // TODO: As port of PHOENIX-4585, we need to make sure this plan 
has a pointer to the data plan
+            // when an index is used instead of the data table, and that this 
method returns that
+            // state for downstream processing.
+            // TODO: It seems inefficient to be recompiling the statement 
again and again inside of this optimize call
             QueryPlan plan = 
statement.getConnection().getQueryServices().getOptimizer().optimize(statement, 
stmt);
             if (!plan.getTableRef().equals(tableRef)) {
                 replacement.put(tableRef, plan.getTableRef());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index e5ed6a5..709534e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -283,7 +283,7 @@ public class PostDDLCompiler {
                                 continue;
                             }
                             QueryPlan plan = new AggregatePlan(context, 
select, tableRef, projector, null, null,
-                                    OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null);
+                                    OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null, null);
                             try {
                                 ResultIterator iterator = plan.iterator();
                                 try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index af2254b..e527ea3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -104,16 +104,13 @@ public class QueryCompiler {
     private final boolean projectTuples;
     private final boolean useSortMergeJoin;
     private final boolean noChildParentJoinOptimization;
+    private final QueryPlan dataPlan;
 
-    public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver) throws SQLException {
-        this(statement, select, resolver, Collections.<PDatum>emptyList(), 
null, new SequenceManager(statement), true);
+    public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws 
SQLException {
+        this(statement, select, resolver, Collections.<PDatum>emptyList(), 
null, new SequenceManager(statement), projectTuples, dataPlan);
     }
 
-    public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, boolean projectTuples) throws SQLException {
-        this(statement, select, resolver, Collections.<PDatum>emptyList(), 
null, new SequenceManager(statement), projectTuples);
-    }
-
-    public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, List<? extends PDatum> targetColumns, 
ParallelIteratorFactory parallelIteratorFactory, SequenceManager 
sequenceManager, boolean projectTuples) throws SQLException {
+    public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, List<? extends PDatum> targetColumns, 
ParallelIteratorFactory parallelIteratorFactory, SequenceManager 
sequenceManager, boolean projectTuples, QueryPlan dataPlan) throws SQLException 
{
         this.statement = statement;
         this.select = select;
         this.resolver = resolver;
@@ -133,10 +130,11 @@ public class QueryCompiler {
 
         scan.setCaching(statement.getFetchSize());
         this.originalScan = ScanUtil.newScan(scan);
+        this.dataPlan = dataPlan;
     }
 
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, List<? extends PDatum> targetColumns, 
ParallelIteratorFactory parallelIteratorFactory, SequenceManager 
sequenceManager) throws SQLException {
-        this(statement, select, resolver, targetColumns, 
parallelIteratorFactory, sequenceManager, true);
+        this(statement, select, resolver, targetColumns, 
parallelIteratorFactory, sequenceManager, true, null);
     }
 
     /**
@@ -495,7 +493,7 @@ public class QueryCompiler {
         }
         int maxRows = this.statement.getMaxRows();
         this.statement.setMaxRows(pushDownMaxRows ? maxRows : 0); // overwrite 
maxRows to avoid its impact on inner queries.
-        QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, 
false).compile();
+        QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, 
false, dataPlan).compile();
         plan = 
statement.getConnection().getQueryServices().getOptimizer().optimize(statement, 
plan);
         this.statement.setMaxRows(maxRows); // restore maxRows.
         return plan;
@@ -586,9 +584,9 @@ public class QueryCompiler {
                             parallelIteratorFactory)
                     : (select.isAggregate() || select.isDistinct()
                             ? new AggregatePlan(context, select, tableRef, 
projector, limit, offset, orderBy,
-                                    parallelIteratorFactory, groupBy, having)
+                                    parallelIteratorFactory, groupBy, having, 
dataPlan)
                             : new ScanPlan(context, select, tableRef, 
projector, limit, offset, orderBy,
-                                    parallelIteratorFactory, allowPageFilter));
+                                    parallelIteratorFactory, allowPageFilter, 
dataPlan));
         }
         if (!subqueries.isEmpty()) {
             int count = subqueries.size();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 18e575c..8c71248 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -147,8 +147,10 @@ public class ScanRanges {
             scanRange = KeyRange.getKeyRange(minKey, maxKey);
         }
         if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
-            minMaxRange = 
ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new 
ImmutableBytesWritable());
-            scanRange = scanRange.intersect(minMaxRange);
+            // Intersect using modified min/max range, but keep original range 
to ensure it
+            // can still be decomposed into it's parts
+            KeyRange inclusiveExclusiveMinMaxRange = 
ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new 
ImmutableBytesWritable());
+            scanRange = scanRange.intersect(inclusiveExclusiveMinMaxRange);
         }
         
         if (scanRange == KeyRange.EMPTY_RANGE) {
@@ -573,7 +575,7 @@ public class ScanRanges {
     }
 
     public int getBoundPkColumnCount() {
-        return this.useSkipScanFilter ? ScanUtil.getRowKeyPosition(slotSpan, 
ranges.size()) : Math.max(getBoundPkSpan(ranges, slotSpan), 
getBoundMinMaxSlotCount());
+        return Math.max(getBoundPkSpan(ranges, slotSpan), 
getBoundMinMaxSlotCount());
     }
 
     private int getBoundMinMaxSlotCount() {
@@ -625,6 +627,10 @@ public class ScanRanges {
     public int[] getSlotSpans() {
         return slotSpan;
     }
+    
+    public KeyRange getScanRange() {
+        return scanRange;
+    }
 
     public boolean hasEqualityConstraint(int pkPosition) {
         int pkOffset = 0;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index a705edf..0ac1c7b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -547,7 +547,7 @@ public class UpsertCompiler {
             select = SelectStatement.create(select, hint);
             // Pass scan through if same table in upsert and select so that 
projection is computed correctly
             // Use optimizer to choose the best plan
-            QueryCompiler compiler = new QueryCompiler(statement, select, 
selectResolver, targetColumns, parallelIteratorFactoryToBe, new 
SequenceManager(statement), false);
+            QueryCompiler compiler = new QueryCompiler(statement, select, 
selectResolver, targetColumns, parallelIteratorFactoryToBe, new 
SequenceManager(statement), false, null);
             queryPlanToBe = compiler.compile();
             // This is post-fix: if the tableRef is a projected table, this 
means there are post-processing
             // steps and parallelIteratorFactory did not take effect.
@@ -695,7 +695,7 @@ public class UpsertCompiler {
                     
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, 
UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                     
                     // Ignore order by - it has no impact
-                    final QueryPlan aggPlan = new AggregatePlan(context, 
select, statementContext.getCurrentTable(), aggProjector, null,null, 
OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+                    final QueryPlan aggPlan = new AggregatePlan(context, 
select, statementContext.getCurrentTable(), aggProjector, null,null, 
OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, originalQueryPlan);
                     return new ServerUpsertSelectMutationPlan(queryPlan, 
tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, 
maxSize, maxSizeBytes);
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 4c29abe..37e0c5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -88,17 +88,17 @@ public class AggregatePlan extends BaseQueryPlan {
 
     public AggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table,
             RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy,
-            ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, 
Expression having) throws SQLException {
+            ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, 
Expression having, QueryPlan dataPlan) throws SQLException {
         this(context, statement, table, projector, limit, offset, orderBy, 
parallelIteratorFactory, groupBy, having,
-                null);
+                null, dataPlan);
     }
 
     private AggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table,
             RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy,
             ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, 
Expression having,
-            Expression dynamicFilter) throws SQLException {
+            Expression dynamicFilter, QueryPlan dataPlan) throws SQLException {
         super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, offset,
-                orderBy, groupBy, parallelIteratorFactory, dynamicFilter);
+                orderBy, groupBy, parallelIteratorFactory, dynamicFilter, 
dataPlan);
         this.having = having;
         this.aggregators = context.getAggregationManager().getAggregators();
         boolean hasSerialHint = 
statement.getHint().hasHint(HintNode.Hint.SERIAL);
@@ -223,8 +223,8 @@ public class AggregatePlan extends BaseQueryPlan {
             }
         }
         BaseResultIterators iterators = isSerial
-                ? new SerialIterators(this, null, null, 
wrapParallelIteratorFactory(), scanGrouper, scan, caches)
-                : new ParallelIterators(this, null, 
wrapParallelIteratorFactory(), scan, false, caches);
+                ? new SerialIterators(this, null, null, 
wrapParallelIteratorFactory(), scanGrouper, scan, caches, dataPlan)
+                : new ParallelIterators(this, null, 
wrapParallelIteratorFactory(), scan, false, caches, dataPlan);
         estimatedRows = iterators.getEstimatedRowCount();
         estimatedSize = iterators.getEstimatedByteCount();
         estimateInfoTimestamp = iterators.getEstimateInfoTimestamp();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index c1ddd44..59a417b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -112,6 +112,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
      * immediately before creating the ResultIterator.
      */
     protected final Expression dynamicFilter;
+    protected final QueryPlan dataPlan;
     protected Long estimatedRows;
     protected Long estimatedSize;
     protected Long estimateInfoTimestamp;
@@ -122,7 +123,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
             StatementContext context, FilterableStatement statement, TableRef 
table,
             RowProjector projection, ParameterMetaData paramMetaData, Integer 
limit, Integer offset, OrderBy orderBy,
             GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory,
-            Expression dynamicFilter) {
+            Expression dynamicFilter, QueryPlan dataPlan) {
         this.context = context;
         this.statement = statement;
         this.tableRef = table;
@@ -135,6 +136,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         this.groupBy = groupBy;
         this.parallelIteratorFactory = parallelIteratorFactory;
         this.dynamicFilter = dynamicFilter;
+        this.dataPlan = dataPlan;
     }
 
        @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 86f59c5..4470947 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -55,7 +55,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan 
{
     public LiteralResultIterationPlan(Iterable<Tuple> tuples, StatementContext 
context, 
             FilterableStatement statement, TableRef tableRef, RowProjector 
projection, 
             Integer limit, Integer offset, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
-        super(context, statement, tableRef, projection, 
context.getBindManager().getParameterMetaData(), limit, offset, orderBy, 
GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null);
+        super(context, statement, tableRef, projection, 
context.getBindManager().getParameterMetaData(), limit, offset, orderBy, 
GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null, null);
         this.tuples = tuples;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 1e1cb0d..af25bff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -91,14 +92,17 @@ public class ScanPlan extends BaseQueryPlan {
     private Long serialBytesEstimate;
     private Long serialEstimateInfoTs;
 
-    public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean 
allowPageFilter) throws SQLException {
-        this(context, statement, table, projector, limit, offset, orderBy, 
parallelIteratorFactory, allowPageFilter, null);
+    public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit,
+            Integer offset, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, boolean allowPageFilter, 
+            QueryPlan dataPlan) throws SQLException {
+        this(context, statement, table, projector, limit, offset, orderBy, 
parallelIteratorFactory, allowPageFilter, null, dataPlan);
     }
     
-    private ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean 
allowPageFilter, Expression dynamicFilter) throws SQLException {
+    private ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, Integer offset,
+            OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, 
boolean allowPageFilter, Expression dynamicFilter, QueryPlan dataPlan) throws 
SQLException {
         super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit,offset, orderBy, 
GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
-                        buildResultIteratorFactory(context, statement, table, 
orderBy, limit, offset, allowPageFilter), dynamicFilter);
+                        buildResultIteratorFactory(context, statement, table, 
orderBy, limit, offset, allowPageFilter), dynamicFilter, dataPlan);
         this.allowPageFilter = allowPageFilter;
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
         if (isOrdered) { // TopN
@@ -235,11 +239,11 @@ public class ScanPlan extends BaseQueryPlan {
                         && isDataToScanWithinThreshold; 
         BaseResultIterators iterators;
         if (isOffsetOnServer) {
-            iterators = new SerialIterators(this, perScanLimit, offset, 
parallelIteratorFactory, scanGrouper, scan, caches);
+            iterators = new SerialIterators(this, perScanLimit, offset, 
parallelIteratorFactory, scanGrouper, scan, caches, dataPlan);
         } else if (isSerial) {
-            iterators = new SerialIterators(this, perScanLimit, null, 
parallelIteratorFactory, scanGrouper, scan, caches);
+            iterators = new SerialIterators(this, perScanLimit, null, 
parallelIteratorFactory, scanGrouper, scan, caches, dataPlan);
         } else {
-            iterators = new ParallelIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches);
+            iterators = new ParallelIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches, 
dataPlan);
         }
         estimatedRows = iterators.getEstimatedRowCount();
         estimatedSize = iterators.getEstimatedByteCount();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 05d3d9e..84fc06a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -37,6 +37,7 @@ import java.io.DataInputStream;
 import java.io.EOFException;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.Iterator;
@@ -93,6 +94,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
@@ -101,9 +103,11 @@ import 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
@@ -157,6 +161,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     private Scan scan;
     private final boolean useStatsForParallelization;
     protected Map<ImmutableBytesPtr,ServerCache> caches;
+    private final QueryPlan dataPlan;
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new 
Function<HRegionLocation, KeyRange>() {
         @Override
@@ -473,13 +478,14 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         }
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset, ParallelScanGrouper scanGrouper, Scan scan, 
Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset, ParallelScanGrouper scanGrouper, Scan scan, 
Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan) throws 
SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), 
plan.getOrderBy(),
                 plan.getStatement().getHint(), 
QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset);
         this.plan = plan;
         this.scan = scan;
         this.caches = caches;
         this.scanGrouper = scanGrouper;
+        this.dataPlan = dataPlan;
         StatementContext context = plan.getContext();
         // Clone MutationState as the one on the connection will change if 
auto commit is on
         // yet we need the original one with the original transaction from 
TableResultIterator.
@@ -681,6 +687,173 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         private long rowsEstimate;
     }
 
+    private int computeColumnsInCommon() {
+        PTable dataTable;
+        if ((dataTable=dataPlan.getTableRef().getTable()).getBucketNum() != 
null) { // unable to compute prefix range for salted data table
+            return 0;
+        }
+
+        PTable table = getTable();
+        int nColumnsOffset = dataTable.isMultiTenant() ? 1 :0;
+        int nColumnsInCommon = nColumnsOffset;
+        List<PColumn> dataPKColumns = dataTable.getPKColumns();
+        List<PColumn> indexPKColumns = table.getPKColumns();
+        int nIndexPKColumns = indexPKColumns.size();
+        int nDataPKColumns = dataPKColumns.size();
+        // Skip INDEX_ID and tenant ID columns
+        for (int i = 1 + nColumnsInCommon; i < nIndexPKColumns; i++) {
+            PColumn indexColumn = indexPKColumns.get(i);
+            String indexColumnName = indexColumn.getName().getString();
+            String cf = IndexUtil.getDataColumnFamilyName(indexColumnName);
+            if (cf.length() != 0) {
+                break;
+            }
+            if (i > nDataPKColumns) {
+                break;
+            }
+            PColumn dataColumn = dataPKColumns.get(i-1);
+            String dataColumnName = dataColumn.getName().getString();
+            // Ensure both name and type are the same. Because of the 
restrictions we have
+            // on PK column types (namely that you can only have a fixed width 
nullable
+            // column as your last column), the type check is more of a sanity 
check
+            // since it wouldn't make sense to have an index with every column 
in common.
+            if (indexColumn.getDataType() == dataColumn.getDataType() 
+                    && 
dataColumnName.equals(IndexUtil.getDataColumnName(indexColumnName))) {
+                nColumnsInCommon++;
+                continue;
+            }
+            break;
+        }
+        return nColumnsInCommon;
+    }
+     
+    // public for testing
+    public static ScanRanges computePrefixScanRanges(ScanRanges 
dataScanRanges, int nColumnsInCommon) {
+        if (nColumnsInCommon == 0) {
+            return ScanRanges.EVERYTHING;
+        }
+        
+        int offset = 0;
+        List<List<KeyRange>> cnf = 
Lists.newArrayListWithExpectedSize(nColumnsInCommon);
+        int[] slotSpan = new int[nColumnsInCommon];
+        boolean useSkipScan = false;
+        boolean hasRange = false;
+        List<List<KeyRange>> rangesList = dataScanRanges.getRanges();
+        int rangesListSize = rangesList.size();
+        while (offset < nColumnsInCommon && offset < rangesListSize) {
+            List<KeyRange> ranges = rangesList.get(offset);
+            // We use a skip scan if we have multiple ranges or if
+            // we have a non single key range before the last range.
+            useSkipScan |= ranges.size() > 1 || hasRange;
+            cnf.add(ranges);
+            int rangeSpan = 1 + dataScanRanges.getSlotSpans()[offset];
+            if (offset + rangeSpan > nColumnsInCommon) {
+                rangeSpan = nColumnsInCommon - offset;
+                // trim range to only be rangeSpan in length
+                ranges = 
Lists.newArrayListWithExpectedSize(cnf.get(cnf.size()-1).size());
+                for (KeyRange range : cnf.get(cnf.size()-1)) {
+                    range = clipRange(dataScanRanges.getSchema(), offset, 
rangeSpan, range);
+                    // trim range to be only rangeSpan in length
+                    ranges.add(range);
+                }
+                cnf.set(cnf.size()-1, ranges);
+            }
+            for (KeyRange range : ranges) {
+                if (!range.isSingleKey()) {
+                    hasRange = true;
+                }
+            }
+            slotSpan[offset] = rangeSpan - 1;
+            offset = offset + rangeSpan;
+        }
+        useSkipScan &= dataScanRanges.useSkipScanFilter();
+        KeyRange minMaxRange = 
+                clipRange(dataScanRanges.getSchema(), 0, nColumnsInCommon, 
dataScanRanges.getMinMaxRange());
+        slotSpan = slotSpan.length == cnf.size() ? slotSpan : 
Arrays.copyOf(slotSpan, cnf.size());
+        ScanRanges commonScanRanges = 
ScanRanges.create(dataScanRanges.getSchema(), cnf, slotSpan, minMaxRange, null, 
useSkipScan, -1);
+        return commonScanRanges;
+    }
+        
+    /**
+     * Truncates range to be a max of rangeSpan fields
+     * @param schema row key schema
+     * @param fieldIndex starting index of field with in the row key schema
+     * @param rangeSpan maximum field length
+     * @return the same range if unchanged and otherwise a new range
+     */
+    public static KeyRange clipRange(RowKeySchema schema, int fieldIndex, int 
rangeSpan, KeyRange range) {
+        if (range == KeyRange.EVERYTHING_RANGE) {
+            return range;
+        }
+        if (range == KeyRange.EMPTY_RANGE) {
+            return range;
+        }
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        boolean newRange = false;
+        boolean lowerUnbound = range.lowerUnbound();
+        boolean lowerInclusive = range.isLowerInclusive();
+        byte[] lowerRange = range.getLowerRange();
+        if (!lowerUnbound && lowerRange.length > 0) {
+            if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, lowerRange, 
ptr, true)) {
+                // Make lower range inclusive since we're decreasing the range 
by chopping the last part off
+                lowerInclusive = true;
+                lowerRange = ptr.copyBytes();
+                newRange = true;
+            }
+        }
+        boolean upperUnbound = range.upperUnbound();
+        boolean upperInclusive = range.isUpperInclusive();
+        byte[] upperRange = range.getUpperRange();
+        if (!upperUnbound && upperRange.length > 0) {
+            if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, upperRange, 
ptr, false)) {
+                // Make lower range inclusive since we're decreasing the range 
by chopping the last part off
+                upperInclusive = true;
+                upperRange = ptr.copyBytes();
+                newRange = true;
+            }
+        }
+        
+        return newRange ? KeyRange.getKeyRange(lowerRange, lowerInclusive, 
upperRange, upperInclusive) : range;
+    }
+
+    private static boolean clipKeyRangeBytes(RowKeySchema schema, int 
fieldIndex, int rangeSpan, byte[] rowKey, ImmutableBytesWritable ptr, boolean 
trimTrailingNulls) {
+        int position = 0;
+        int maxOffset = schema.iterator(rowKey, ptr);
+        byte[] newRowKey = new byte[rowKey.length];
+        int offset = 0;
+        int trailingNullsToTrim = 0;
+        do {
+            if (schema.next(ptr, fieldIndex, maxOffset) == null) {
+                break;
+            }
+            System.arraycopy(ptr.get(), ptr.getOffset(), newRowKey, offset, 
ptr.getLength());
+            offset += ptr.getLength();
+            Field field =  schema.getField(fieldIndex);
+            if (field.getDataType().isFixedWidth()) {
+                trailingNullsToTrim = 0;
+            } else {
+                boolean isNull = ptr.getLength() == 0;
+                byte sepByte = SchemaUtil.getSeparatorByte(true, isNull, 
field);
+                newRowKey[offset++] = sepByte;
+                if (isNull) {
+                    if (trimTrailingNulls) {
+                        trailingNullsToTrim++;
+                    } else {
+                        trailingNullsToTrim = 0;
+                    }
+                } else {
+                    // So that last zero separator byte is always trimmed
+                    trailingNullsToTrim = 1;
+                }
+            }
+            fieldIndex++;
+        } while (++position < rangeSpan);
+        // remove trailing nulls
+        ptr.set(newRowKey, 0, offset - trailingNullsToTrim);
+        // return true if we've clipped the rowKey
+        return maxOffset != offset;
+    }
+    
     /**
      * Compute the list of parallel scans to run for a given query. The inner 
scans
      * may be concatenated together directly, while the other ones may need to 
be
@@ -702,26 +875,43 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         // case we generate an empty guide post with the byte estimate being 
set as guide post
         // width. 
         boolean emptyGuidePost = gps.isEmptyGuidePost();
+        byte[] startRegionBoundaryKey = startKey;
+        byte[] stopRegionBoundaryKey = stopKey;
+        int columnsInCommon = 0;
+        ScanRanges prefixScanRanges = ScanRanges.EVERYTHING;
         boolean traverseAllRegions = isSalted || isLocalIndex;
-        if (!traverseAllRegions) {
+        if (isLocalIndex) {
+            // TODO: when implementing PHOENIX-4585, we should change this to 
an assert
+            // as we should always have a data plan when a local index is 
being used.
+            if (dataPlan != null && 
dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity 
check
+                prefixScanRanges = 
computePrefixScanRanges(dataPlan.getContext().getScanRanges(), 
columnsInCommon=computeColumnsInCommon());
+                KeyRange prefixRange = prefixScanRanges.getScanRange();
+                if (!prefixRange.lowerUnbound()) {
+                    startRegionBoundaryKey = prefixRange.getLowerRange();
+                }
+                if (!prefixRange.upperUnbound()) {
+                    stopRegionBoundaryKey = prefixRange.getUpperRange();
+                }
+            }
+        } else if (!traverseAllRegions) {
             byte[] scanStartRow = scan.getStartRow();
             if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, 
startKey) > 0) {
-                startKey = scanStartRow;
+                startRegionBoundaryKey = startKey = scanStartRow;
             }
             byte[] scanStopRow = scan.getStopRow();
             if (stopKey.length == 0
                     || (scanStopRow.length != 0 && 
Bytes.compareTo(scanStopRow, stopKey) < 0)) {
-                stopKey = scanStopRow;
+                stopRegionBoundaryKey = stopKey = scanStopRow;
             }
         }
         
         int regionIndex = 0;
         int stopIndex = regionBoundaries.size();
-        if (startKey.length > 0) {
-            regionIndex = getIndexContainingInclusive(regionBoundaries, 
startKey);
+        if (startRegionBoundaryKey.length > 0) {
+            regionIndex = getIndexContainingInclusive(regionBoundaries, 
startRegionBoundaryKey);
         }
-        if (stopKey.length > 0) {
-            stopIndex = Math.min(stopIndex, regionIndex + 
getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), 
stopKey));
+        if (stopRegionBoundaryKey.length > 0) {
+            stopIndex = Math.min(stopIndex, regionIndex + 
getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), 
stopRegionBoundaryKey));
             if (isLocalIndex) {
                 stopKey = 
regionLocations.get(stopIndex).getRegion().getEndKey();
             }
@@ -771,15 +961,29 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                 HRegionLocation regionLocation = 
regionLocations.get(regionIndex);
                 RegionInfo regionInfo = regionLocation.getRegion();
                 byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
-                byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
+                byte[] endKey;
                 if (regionIndex == stopIndex) {
                     endKey = stopKey;
                 } else {
                     endKey = regionBoundaries.get(regionIndex);
                 }
                 if (isLocalIndex) {
-                    endRegionKey = regionInfo.getEndKey();
-                    keyOffset = 
ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
+                    // Only attempt further pruning if the prefix range is 
using
+                    // a skip scan since we've already pruned the range of 
regions
+                    // based on the start/stop key.
+                    if (columnsInCommon > 0 && 
prefixScanRanges.useSkipScanFilter()) {
+                        byte[] regionStartKey = regionInfo.getStartKey();
+                        ImmutableBytesWritable ptr = context.getTempPtr();
+                        clipKeyRangeBytes(prefixScanRanges.getSchema(), 0, 
columnsInCommon, regionStartKey, ptr, false);
+                        regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                        // Prune this region if there's no intersection
+                        if (!prefixScanRanges.intersectRegion(regionStartKey, 
regionInfo.getEndKey(), false)) {
+                            currentKeyBytes = endKey;
+                            regionIndex++;
+                            continue;
+                        }
+                    }
+                    keyOffset = 
ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
                 }
                 byte[] initialKeyBytes = currentKeyBytes;
                 while (intersectWithGuidePosts && (endKey.length == 0 || 
currentGuidePost.compareTo(endKey) <= 0)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 06ac3c0..265e213 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -265,7 +265,8 @@ public abstract class ExplainTable {
         if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
             RowKeySchema schema = tableRef.getTable().getRowKeySchema();
             if (!minMaxRange.isUnbound(bound)) {
-                minMaxIterator = new RowKeyValueIterator(schema, 
minMaxRange.getRange(bound));
+                // Use scan ranges from ScanRanges since it will have been 
intersected with minMaxRange
+                minMaxIterator = new RowKeyValueIterator(schema, 
scanRanges.getScanRange().getRange(bound));
             }
         }
         boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 3c11f4a..3a4b084 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -57,16 +57,16 @@ public class ParallelIterators extends BaseResultIterators {
     private final ParallelIteratorFactory iteratorFactory;
     private final boolean initFirstScanOnly;
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan 
scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan 
scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches, 
QueryPlan dataPlan)
             throws SQLException {
-        super(plan, perScanLimit, null, scanGrouper, scan,caches);
+        super(plan, perScanLimit, null, scanGrouper, scan,caches, dataPlan);
         this.iteratorFactory = iteratorFactory;
         this.initFirstScanOnly = initFirstScanOnly;
     }   
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, Scan scan, boolean 
initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, Scan scan, boolean 
initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan 
dataPlan)
             throws SQLException {
-        this(plan, perScanLimit, iteratorFactory, 
DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches);
+        this(plan, perScanLimit, iteratorFactory, 
DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches, 
dataPlan);
     }  
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 26d1ed1..f94a7c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -62,9 +62,9 @@ public class SerialIterators extends BaseResultIterators {
     private final Integer offset;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset,
-            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper 
scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches)
+            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper 
scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan 
dataPlan)
             throws SQLException {
-        super(plan, perScanLimit, offset, scanGrouper, scan, caches);
+        super(plan, perScanLimit, offset, scanGrouper, scan, caches, dataPlan);
         this.offset = offset;
         // must be a offset or a limit specified or a SERIAL hint
         Preconditions.checkArgument(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 0a90d3c..2274eba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -473,7 +473,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                 select = StatementNormalizer.normalize(transformedSelect, 
resolver);
             }
 
-            QueryPlan plan = new QueryCompiler(stmt, select, resolver, 
Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new 
SequenceManager(stmt), true).compile();
+            QueryPlan plan = new QueryCompiler(stmt, select, resolver, 
Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new 
SequenceManager(stmt), true, null).compile();
             
plan.getContext().getSequenceManager().validateSequences(seqAction);
             return plan;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index b3df50b..d30013b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -232,7 +232,7 @@ public class QueryOptimizer {
             try {
                // translate nodes that match expressions that are indexed to 
the associated column parse node
                 indexSelect = ParseNodeRewriter.rewrite(indexSelect, new  
IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), 
indexSelect.getUdfParseNodes()));
-                QueryCompiler compiler = new QueryCompiler(statement, 
indexSelect, resolver, targetColumns, parallelIteratorFactory, 
dataPlan.getContext().getSequenceManager(), isProjected);
+                QueryCompiler compiler = new QueryCompiler(statement, 
indexSelect, resolver, targetColumns, parallelIteratorFactory, 
dataPlan.getContext().getSequenceManager(), isProjected, dataPlan);
                 
                 QueryPlan plan = compiler.compile();
                 // If query doesn't have where clause and some of columns to 
project are missing
@@ -301,7 +301,7 @@ public class QueryOptimizer {
                         query = SubqueryRewriter.transform(query, 
queryResolver, statement.getConnection());
                         queryResolver = 
FromCompiler.getResolverForQuery(query, statement.getConnection());
                         query = StatementNormalizer.normalize(query, 
queryResolver);
-                        QueryPlan plan = new QueryCompiler(statement, query, 
queryResolver, targetColumns, parallelIteratorFactory, 
dataPlan.getContext().getSequenceManager(), isProjected).compile();
+                        QueryPlan plan = new QueryCompiler(statement, query, 
queryResolver, targetColumns, parallelIteratorFactory, 
dataPlan.getContext().getSequenceManager(), isProjected, dataPlan).compile();
                         return plan;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 142b80e..b2b8efb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -82,6 +82,7 @@ import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -240,7 +241,12 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     public MetaDataMutationResult createTable(List<Mutation> tableMetaData, 
byte[] physicalName, PTableType tableType,
             Map<String, Object> tableProps, List<Pair<byte[], Map<String, 
Object>>> families, byte[][] splits,
             boolean isNamespaceMapped, boolean allocateIndexId) throws 
SQLException {
-        if (splits != null) {
+        if (tableType == PTableType.INDEX && 
IndexUtil.isLocalIndexFamily(Bytes.toString(families.iterator().next().getFirst())))
 {
+            Object dataTableName = 
tableProps.get(PhoenixDatabaseMetaData.DATA_TABLE_NAME);
+            List<HRegionLocation> regionLocations = 
tableSplits.get(dataTableName);
+            byte[] tableName = getTableName(tableMetaData, physicalName);
+            tableSplits.put(Bytes.toString(tableName), regionLocations);
+        } else if (splits != null) {
             byte[] tableName = getTableName(tableMetaData, physicalName);
             tableSplits.put(Bytes.toString(tableName), 
generateRegionLocations(tableName, splits));
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index e11962e..4baead4 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -4161,4 +4161,230 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
             assertEquals(e.getErrorCode(), 
SQLExceptionCode.CONNECTION_CLOSED.getErrorCode());
         }
     }
+    
+    @Test
+    public void testSingleColLocalIndexPruning() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T (\n" + 
+                    "    A CHAR(1) NOT NULL,\n" + 
+                    "    B CHAR(1) NOT NULL,\n" + 
+                    "    C CHAR(1) NOT NULL,\n" + 
+                    "    CONSTRAINT PK PRIMARY KEY (\n" + 
+                    "        A,\n" + 
+                    "        B,\n" + 
+                    "        C\n" + 
+                    "    )\n" + 
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,C)");
+            String query = "SELECT * FROM T WHERE A = 'B' and C='C'";
+            PhoenixStatement statement = 
conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = statement.optimizeQuery(query);
+            assertEquals("IDX", 
plan.getContext().getCurrentTable().getTable().getName().getString());
+            plan.iterator();
+            List<List<Scan>> outerScans = plan.getScans();
+            assertEquals(1, outerScans.size());
+            List<Scan> innerScans = outerScans.get(0);
+            assertEquals(1, innerScans.size());
+            Scan scan = innerScans.get(0);
+            assertEquals("A", Bytes.toString(scan.getStartRow()).trim());
+            assertEquals("C", Bytes.toString(scan.getStopRow()).trim());
+        }
+    }
+
+    @Test
+    public void testMultiColLocalIndexPruning() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T (\n" + 
+                    "    A CHAR(1) NOT NULL,\n" + 
+                    "    B CHAR(1) NOT NULL,\n" + 
+                    "    C CHAR(1) NOT NULL,\n" + 
+                    "    D CHAR(1) NOT NULL,\n" + 
+                    "    CONSTRAINT PK PRIMARY KEY (\n" + 
+                    "        A,\n" + 
+                    "        B,\n" + 
+                    "        C,\n" + 
+                    "        D\n" + 
+                    "    )\n" + 
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX ON 
T(A,B,D)");
+            String query = "SELECT * FROM T WHERE A = 'C' and B = 'X' and 
D='C'";
+            PhoenixStatement statement = 
conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = statement.optimizeQuery(query);
+            assertEquals("IDX", 
plan.getContext().getCurrentTable().getTable().getName().getString());
+            plan.iterator();
+            List<List<Scan>> outerScans = plan.getScans();
+            assertEquals(1, outerScans.size());
+            List<Scan> innerScans = outerScans.get(0);
+            assertEquals(1, innerScans.size());
+            Scan scan = innerScans.get(0);
+            assertEquals("C", Bytes.toString(scan.getStartRow()).trim());
+            assertEquals("E", Bytes.toString(scan.getStopRow()).trim());
+        }
+    }
+
+    @Test
+    public void testSkipScanLocalIndexPruning() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T (\n" + 
+                    "    A CHAR(1) NOT NULL,\n" + 
+                    "    B CHAR(1) NOT NULL,\n" + 
+                    "    C CHAR(1) NOT NULL,\n" + 
+                    "    D CHAR(1) NOT NULL,\n" + 
+                    "    CONSTRAINT PK PRIMARY KEY (\n" + 
+                    "        A,\n" + 
+                    "        B,\n" + 
+                    "        C,\n" + 
+                    "        D\n" + 
+                    "    )\n" + 
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX ON 
T(A,B,D)");
+            String query = "SELECT * FROM T WHERE A IN ('A','G') and B = 'A' 
and D = 'D'";
+            PhoenixStatement statement = 
conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = statement.optimizeQuery(query);
+            assertEquals("IDX", 
plan.getContext().getCurrentTable().getTable().getName().getString());
+            plan.iterator();
+            List<List<Scan>> outerScans = plan.getScans();
+            assertEquals(2, outerScans.size());
+            List<Scan> innerScans1 = outerScans.get(0);
+            assertEquals(1, innerScans1.size());
+            Scan scan1 = innerScans1.get(0);
+            assertEquals("A", Bytes.toString(scan1.getStartRow()).trim());
+            assertEquals("C", Bytes.toString(scan1.getStopRow()).trim());
+            List<Scan> innerScans2 = outerScans.get(1);
+            assertEquals(1, innerScans2.size());
+            Scan scan2 = innerScans2.get(0);
+            assertEquals("G", Bytes.toString(scan2.getStartRow()).trim());
+            assertEquals("I", Bytes.toString(scan2.getStopRow()).trim());
+        }
+    }
+
+    @Test
+    public void testRVCLocalIndexPruning() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T (\n" + 
+                    "    A CHAR(1) NOT NULL,\n" + 
+                    "    B CHAR(1) NOT NULL,\n" + 
+                    "    C CHAR(1) NOT NULL,\n" + 
+                    "    D CHAR(1) NOT NULL,\n" + 
+                    "    CONSTRAINT PK PRIMARY KEY (\n" + 
+                    "        A,\n" + 
+                    "        B,\n" + 
+                    "        C,\n" + 
+                    "        D\n" + 
+                    "    )\n" + 
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX ON 
T(A,B,D)");
+            String query = "SELECT * FROM T WHERE A='I' and (B,D) IN 
(('A','D'),('B','I'))";
+            PhoenixStatement statement = 
conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = statement.optimizeQuery(query);
+            assertEquals("IDX", 
plan.getContext().getCurrentTable().getTable().getName().getString());
+            plan.iterator();
+            List<List<Scan>> outerScans = plan.getScans();
+            assertEquals(1, outerScans.size());
+            List<Scan> innerScans = outerScans.get(0);
+            assertEquals(1, innerScans.size());
+            Scan scan = innerScans.get(0);
+            assertEquals("I", Bytes.toString(scan.getStartRow()).trim());
+            assertEquals(0, scan.getStopRow().length);
+        }
+    }
+
+    @Test
+    public void testRVCLocalIndexPruning2() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T (\n" + 
+                    "    A CHAR(1) NOT NULL,\n" + 
+                    "    B VARCHAR,\n" + 
+                    "    C VARCHAR,\n" + 
+                    "    D VARCHAR,\n" + 
+                    "    E VARCHAR,\n" + 
+                    "    F VARCHAR,\n" + 
+                    "    G VARCHAR,\n" + 
+                    "    CONSTRAINT PK PRIMARY KEY (\n" + 
+                    "        A,\n" + 
+                    "        B,\n" + 
+                    "        C,\n" + 
+                    "        D,\n" + 
+                    "        E,\n" + 
+                    "        F,\n" + 
+                    "        G\n" + 
+                    "    )\n" + 
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX ON 
T(A,B,C,F,G)");
+            String query = "SELECT * FROM T WHERE (A,B,C,D) IN 
(('I','D','F','X'),('I','I','G','Y')) and F='X' and G='Y'";
+            PhoenixStatement statement = 
conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = statement.optimizeQuery(query);
+            assertEquals("IDX", 
plan.getContext().getCurrentTable().getTable().getName().getString());
+            plan.iterator();
+            List<List<Scan>> outerScans = plan.getScans();
+            assertEquals(1, outerScans.size());
+            List<Scan> innerScans = outerScans.get(0);
+            assertEquals(1, innerScans.size());
+            Scan scan = innerScans.get(0);
+            assertEquals("I", Bytes.toString(scan.getStartRow()).trim());
+            assertEquals(0, scan.getStopRow().length);
+        }
+    }
+
+    @Test
+    public void testMinMaxRangeLocalIndexPruning() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T (\n" + 
+                    "    A CHAR(1) NOT NULL,\n" + 
+                    "    B CHAR(1) NOT NULL,\n" + 
+                    "    C CHAR(1) NOT NULL,\n" + 
+                    "    D CHAR(1) NOT NULL,\n" + 
+                    "    CONSTRAINT PK PRIMARY KEY (\n" + 
+                    "        A,\n" + 
+                    "        B,\n" + 
+                    "        C,\n" + 
+                    "        D\n" + 
+                    "    )\n" + 
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX ON 
T(A,B,D)");
+            String query = "SELECT * FROM T WHERE A = 'C' and (A,B,D) > 
('C','B','X') and D='C'";
+            PhoenixStatement statement = 
conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = statement.optimizeQuery(query);
+            assertEquals("IDX", 
plan.getContext().getCurrentTable().getTable().getName().getString());
+            plan.iterator();
+            List<List<Scan>> outerScans = plan.getScans();
+            assertEquals(1, outerScans.size());
+            List<Scan> innerScans = outerScans.get(0);
+            assertEquals(1, innerScans.size());
+            Scan scan = innerScans.get(0);
+            assertEquals("C", Bytes.toString(scan.getStartRow()).trim());
+            assertEquals("E", Bytes.toString(scan.getStopRow()).trim());
+        }
+    }
+
+    @Test
+    public void testNoLocalIndexPruning() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T (\n" + 
+                    "    A CHAR(1) NOT NULL,\n" + 
+                    "    B CHAR(1) NOT NULL,\n" + 
+                    "    C CHAR(1) NOT NULL,\n" + 
+                    "    CONSTRAINT PK PRIMARY KEY (\n" + 
+                    "        A,\n" + 
+                    "        B,\n" + 
+                    "        C\n" + 
+                    "    )\n" + 
+                    ") SPLIT ON ('A','C','E','G','I')");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(C)");
+            String query = "SELECT * FROM T WHERE C='C'";
+            PhoenixStatement statement = 
conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = statement.optimizeQuery(query);
+            assertEquals("IDX", 
plan.getContext().getCurrentTable().getTable().getName().getString());
+            plan.iterator();
+            List<List<Scan>> outerScans = plan.getScans();
+            assertEquals(6, outerScans.size());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
new file mode 100644
index 0000000..abc435a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import static org.apache.phoenix.query.KeyRange.UNBOUND;
+import static 
org.apache.phoenix.query.QueryConstants.DESC_SEPARATOR_BYTE_ARRAY;
+import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.iterate.BaseResultIterators;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Test for intersect method in {@link SkipScanFilter}
+ */
+@RunWith(Parameterized.class)
+public class KeyRangeClipTest extends BaseConnectionlessQueryTest {
+    private final RowKeySchema schema;
+    private final KeyRange input;
+    private final KeyRange expectedOutput;
+    private final int clipTo;
+
+    private static byte[] getRange(PhoenixConnection pconn, List<Object> 
startValues) throws SQLException {
+        byte[] lowerRange;
+        if (startValues == null) {
+            lowerRange = KeyRange.UNBOUND;
+        } else {
+            String upsertValues = StringUtils.repeat("?,", 
startValues.size()).substring(0,startValues.size() * 2 - 1);
+            String upsertStmt = "UPSERT INTO T VALUES(" + upsertValues + ")";
+            PreparedStatement stmt = pconn.prepareStatement(upsertStmt);
+            for (int i = 0; i < startValues.size(); i++) {
+                stmt.setObject(i+1, startValues.get(i));
+            }
+            stmt.execute();
+            Cell startCell = 
PhoenixRuntime.getUncommittedDataIterator(pconn).next().getSecond().get(0);
+            lowerRange = CellUtil.cloneRow(startCell);
+            pconn.rollback();
+        }
+        return lowerRange;
+    }
+    
+    public KeyRangeClipTest(String tableDef, List<Object> startValues, 
List<Object> endValues, int clipTo, KeyRange expectedOutput) throws 
SQLException {
+        PhoenixConnection pconn = 
DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class);
+        pconn.createStatement().execute("CREATE TABLE T(" + tableDef+ ")");
+        PTable table = pconn.getMetaDataCache().getTableRef(new 
PTableKey(null,"T")).getTable();
+        this.schema = table.getRowKeySchema();
+        byte[] lowerRange = getRange(pconn, startValues);
+        byte[] upperRange = getRange(pconn, endValues);
+        this.input = KeyRange.getKeyRange(lowerRange, upperRange);
+        this.expectedOutput = expectedOutput;
+        this.clipTo = clipTo;
+    }
+
+    @After
+    public void cleanup() throws SQLException {
+        PhoenixConnection pconn = 
DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class);
+        pconn.createStatement().execute("DROP TABLE T");
+    }
+    
+    @Test
+    public void test() {
+        ScanRanges scanRanges = ScanRanges.create(schema, 
Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(input)),
 new int[] {schema.getFieldCount()-1}, KeyRange.EVERYTHING_RANGE, null, false, 
-1);
+        ScanRanges clippedRange = 
BaseResultIterators.computePrefixScanRanges(scanRanges, clipTo);
+        assertEquals(expectedOutput, clippedRange.getScanRange());
+    }
+
+    @Parameters(name="KeyRangeClipTest_{0}")
+    public static Collection<Object> data() {
+        List<Object> testCases = Lists.newArrayList();
+        testCases.add(Lists.newArrayList( // [XY - *]
+                "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK 
PRIMARY KEY (A,B,C)",
+                Lists.newArrayList("XY",null,"Z"), null, 2,
+                KeyRange.getKeyRange(Bytes.toBytes("XY"), true, UNBOUND, 
false)).toArray());
+        testCases.add(Lists.newArrayList(
+                "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK 
PRIMARY KEY (A,B,C)",
+                null, Lists.newArrayList("XY",null,"Z"), 2,
+                KeyRange.getKeyRange(
+                        ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips 
null values for unbound lower
+                        
ByteUtil.nextKey(ByteUtil.concat(Bytes.toBytes("XY"),SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY)),
 false)).toArray());
+        testCases.add(Lists.newArrayList(
+                "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, D VARCHAR, 
CONSTRAINT PK PRIMARY KEY (A,B,C,D)",
+                Lists.newArrayList("XY",null,null,"Z"), null, 3,
+                KeyRange.getKeyRange(Bytes.toBytes("XY"), true, UNBOUND, 
false)).toArray());
+        testCases.add(Lists.newArrayList(
+                "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, D VARCHAR, 
CONSTRAINT PK PRIMARY KEY (A,B,C,D)",
+                null, Lists.newArrayList("XY",null,null,"Z"), 3,
+                KeyRange.getKeyRange(
+                        ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips 
null values for unbound lower
+                        
ByteUtil.nextKey(ByteUtil.concat(Bytes.toBytes("XY"),SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY)),
 false)).toArray());
+        testCases.add(Lists.newArrayList(
+                "A CHAR(1) NOT NULL, B CHAR(1) NOT NULL, C CHAR(1) NOT NULL, 
CONSTRAINT PK PRIMARY KEY (A,B,C)",
+                Lists.newArrayList("A","B","C"), 
Lists.newArrayList("C","D","E"), 2,
+                KeyRange.getKeyRange(Bytes.toBytes("AB"), true, 
ByteUtil.nextKey(Bytes.toBytes("CD")), false)).toArray());
+        testCases.add(Lists.newArrayList(
+                "A VARCHAR NOT NULL, B VARCHAR, C SMALLINT NOT NULL, D 
VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)",
+                Lists.<Object>newArrayList("XY",null,1,"Z"), null, 3,
+                KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes("XY"), 
SEPARATOR_BYTE_ARRAY, SEPARATOR_BYTE_ARRAY, PSmallint.INSTANCE.toBytes(1)), 
true, UNBOUND, false)).toArray());
+        testCases.add(Lists.newArrayList(
+                "A VARCHAR NOT NULL, B BIGINT NOT NULL, C VARCHAR, CONSTRAINT 
PK PRIMARY KEY (A,B DESC,C)",
+                Lists.<Object>newArrayList("XYZ",1,"Z"), null, 2,
+                KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes("XYZ"), 
SEPARATOR_BYTE_ARRAY, PLong.INSTANCE.toBytes(1, SortOrder.DESC)), true, 
UNBOUND, false)).toArray());
+        testCases.add(Lists.newArrayList(
+                "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK 
PRIMARY KEY (A DESC,B,C)",
+                null, Lists.newArrayList("XY",null,"Z"), 3,
+                KeyRange.getKeyRange(
+                        ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips 
null values for unbound lower
+                        
(ByteUtil.concat(PVarchar.INSTANCE.toBytes("XY",SortOrder.DESC),DESC_SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,Bytes.toBytes("Z"))),
 false)).toArray());
+        return testCases;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c1746c2/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 935d8cb..cb34d2b 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -487,7 +487,7 @@ public class ParallelIteratorsSplitTest extends 
BaseConnectionlessQueryTest {
                 return null;
             }
             
-        }, null, new 
SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()),
 context.getScan(), false, null);
+        }, null, new 
SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()),
 context.getScan(), false, null, null);
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
     }

Reply via email to