http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index a7b5687..877c939 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -177,7 +177,7 @@ public class PhoenixTxIndexMutationGenerator {
             
             // Project empty key value column
             scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
emptyKeyValueQualifier);
-            ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
+            ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, null, true, 
-1);
             scanRanges.initializeScan(scan);
             Table txTable = 
indexMetaData.getTransactionContext().getTransactionalTable(htable, 
isImmutable);
             // For rollback, we need to see all versions, including

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
index b45706a..bc9fa9f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
@@ -30,7 +30,15 @@ import org.apache.phoenix.expression.Expression;
  * @since 0.1
  */
 public abstract class FunctionExpression extends BaseCompoundExpression {
-    public enum OrderPreserving {NO, YES_IF_LAST, YES};
+    public enum OrderPreserving {NO, YES_IF_LAST, YES;
+
+    public OrderPreserving combine(OrderPreserving that) {
+        if (that == null) {
+            return this;
+        }
+        return OrderPreserving.values()[Math.min(this.ordinal(), 
that.ordinal())];
+    }};
+    
     public FunctionExpression() {
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/InvertFunction.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/InvertFunction.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/InvertFunction.java
index 3615cbe..8ef5914 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/InvertFunction.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/InvertFunction.java
@@ -96,7 +96,24 @@ public class InvertFunction extends ScalarFunction {
             @Override
             public KeyRange getKeyRange(CompareOp op, Expression rhs) {
                 KeyRange range = childPart.getKeyRange(op, rhs);
-                return range.invert();
+                byte[] lower = range.getLowerRange();
+                if (!range.lowerUnbound()) {
+                    lower = SortOrder.invert(lower, 0, lower.length);
+                }
+                byte[] upper;
+                if (range.isSingleKey()) {
+                    upper = lower;
+                } else {
+                    upper = range.getUpperRange();
+                    if (!range.upperUnbound()) {
+                        upper = SortOrder.invert(upper, 0, upper.length);
+                    }
+                }
+                range = KeyRange.getKeyRange(lower, range.isLowerInclusive(), 
upper, range.isUpperInclusive());
+                if (getColumn().getSortOrder() == SortOrder.DESC) {
+                    range = range.invert();
+                }
+                return range;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PrefixFunction.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PrefixFunction.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PrefixFunction.java
index cb98e28..ff3e74d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PrefixFunction.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PrefixFunction.java
@@ -110,7 +110,11 @@ abstract public class PrefixFunction extends 
ScalarFunction {
                         lowerRange[lowerRange.length-1] = 
QueryConstants.SEPARATOR_BYTE;
                     }
                 }
-                return KeyRange.getKeyRange(lowerRange, lowerInclusive, 
upperRange, false);
+                KeyRange range = KeyRange.getKeyRange(lowerRange, 
lowerInclusive, upperRange, false);
+                if (column.getSortOrder() == SortOrder.DESC) {
+                    range = range.invert();
+                }
+                return range;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RTrimFunction.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RTrimFunction.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RTrimFunction.java
index 5713713..81e4f9e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RTrimFunction.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RTrimFunction.java
@@ -168,7 +168,11 @@ public class RTrimFunction extends ScalarFunction {
                         upperRange = type.pad(upperRange, length, 
SortOrder.ASC);
                     }
                 }
-                return KeyRange.getKeyRange(lowerRange, lowerInclusive, 
upperRange, upperInclusive);
+                KeyRange range = KeyRange.getKeyRange(lowerRange, 
lowerInclusive, upperRange, upperInclusive);
+                if (getColumn().getSortOrder() == SortOrder.DESC) {
+                    range = range.invert();
+                }
+                return range;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDateExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDateExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDateExpression.java
index d747771..988b027 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDateExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDateExpression.java
@@ -32,6 +32,9 @@ import org.apache.phoenix.compile.KeyPart;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.FunctionParseNode.FunctionClassType;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
@@ -39,13 +42,10 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDataType.PDataCodec;
+import org.apache.phoenix.schema.types.PDate;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.schema.types.PDate;
-import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
-import org.apache.phoenix.parse.FunctionParseNode.Argument;
-import org.apache.phoenix.parse.FunctionParseNode.FunctionClassType;
 
 import com.google.common.collect.Lists;
 
@@ -261,6 +261,7 @@ public class RoundDateExpression extends ScalarFunction {
                 int offset = ByteUtil.isInclusive(op) ? 1 : 0;
                 long value = codec.decodeLong(key, 0, SortOrder.getDefault());
                 byte[] nextKey = new byte[type.getByteSize()];
+                KeyRange range;
                 switch (op) {
                 case EQUAL:
                     // If the value isn't evenly divisible by the div amount, 
then it
@@ -272,18 +273,25 @@ public class RoundDateExpression extends ScalarFunction {
                         return KeyRange.EMPTY_RANGE;
                     }
                     codec.encodeLong(value + divBy, nextKey, 0);
-                    return type.getKeyRange(key, true, nextKey, false);
+                    range = type.getKeyRange(key, true, nextKey, false);
+                    break;
                 case GREATER:
                 case GREATER_OR_EQUAL:
                     codec.encodeLong((value + divBy - offset)/divBy*divBy, 
nextKey, 0);
-                    return type.getKeyRange(nextKey, true, KeyRange.UNBOUND, 
false);
+                    range = type.getKeyRange(nextKey, true, KeyRange.UNBOUND, 
false);
+                    break;
                 case LESS:
                 case LESS_OR_EQUAL:
                     codec.encodeLong((value + divBy - (1 
-offset))/divBy*divBy, nextKey, 0);
-                    return type.getKeyRange(KeyRange.UNBOUND, false, nextKey, 
false);
+                    range = type.getKeyRange(KeyRange.UNBOUND, false, nextKey, 
false);
+                    break;
                 default:
                     return childPart.getKeyRange(op, rhs);
                 }
+                if (getColumn().getSortOrder() == SortOrder.DESC) {
+                    range = range.invert();
+                }
+                return range;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
index ab525f1..5a5bcfe 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDecimal;
@@ -238,7 +239,11 @@ public class RoundDecimalExpression extends ScalarFunction 
{
                         throw new AssertionError("Invalid CompareOp: " + op);
                 }
 
-                return KeyRange.getKeyRange(lowerRange, lowerInclusive, 
upperRange, upperInclusive);
+                KeyRange range = KeyRange.getKeyRange(lowerRange, 
lowerInclusive, upperRange, upperInclusive);
+                if (getColumn().getSortOrder() == SortOrder.DESC) {
+                    range = range.invert();
+                }
+                return range;
             }
             
             /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index aa9a9f5..d890383 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -774,10 +774,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             offset = offset + rangeSpan;
         }
         useSkipScan &= dataScanRanges.useSkipScanFilter();
-        KeyRange minMaxRange = 
-                clipRange(dataScanRanges.getSchema(), 0, nColumnsInCommon, 
dataScanRanges.getMinMaxRange());
         slotSpan = slotSpan.length == cnf.size() ? slotSpan : 
Arrays.copyOf(slotSpan, cnf.size());
-        ScanRanges commonScanRanges = 
ScanRanges.create(dataScanRanges.getSchema(), cnf, slotSpan, minMaxRange, null, 
useSkipScan, -1);
+        ScanRanges commonScanRanges = 
ScanRanges.create(dataScanRanges.getSchema(), cnf, slotSpan, null, useSkipScan, 
-1);
         return commonScanRanges;
     }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 265e213..1a22f60 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -258,17 +258,7 @@ public abstract class ExplainTable {
     
     private void appendScanRow(StringBuilder buf, Bound bound) {
         ScanRanges scanRanges = context.getScanRanges();
-        // TODO: review this and potentially intersect the scan ranges
-        // with the minMaxRange in ScanRanges to prevent having to do all this.
-        KeyRange minMaxRange = scanRanges.getMinMaxRange();
         Iterator<byte[]> minMaxIterator = Collections.emptyIterator();
-        if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
-            RowKeySchema schema = tableRef.getTable().getRowKeySchema();
-            if (!minMaxRange.isUnbound(bound)) {
-                // Use scan ranges from ScanRanges since it will have been 
intersected with minMaxRange
-                minMaxIterator = new RowKeyValueIterator(schema, 
scanRanges.getScanRange().getRange(bound));
-            }
-        }
         boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan());
         boolean forceSkipScan = this.hint.hasHint(Hint.SKIP_SCAN);
         int nRanges = forceSkipScan ? scanRanges.getRanges().size() : 
scanRanges.getBoundSlotCount();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
index 2159084..7d09adb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
@@ -52,12 +52,13 @@ public class KeyRange implements Writable {
     public enum Bound { LOWER, UPPER };
     private static final byte[] DEGENERATE_KEY = new byte[] {1};
     public static final byte[] UNBOUND = new byte[0];
+    public static final byte[] NULL_BOUND = new byte[0];
     /**
      * KeyRange for variable length null values. Since we need to represent 
this using an empty byte array (which
      * is what we use for upper/lower bound), we create this range using the 
private constructor rather than
      * going through the static creation method (where this would not be 
possible).
      */
-    public static final KeyRange IS_NULL_RANGE = new 
KeyRange(ByteUtil.EMPTY_BYTE_ARRAY, true, ByteUtil.EMPTY_BYTE_ARRAY, true);
+    public static final KeyRange IS_NULL_RANGE = new KeyRange(NULL_BOUND, 
true, NULL_BOUND, true);
     /**
      * KeyRange for non null variable length values. Since we need to 
represent this using an empty byte array (which
      * is what we use for upper/lower bound), we create this range using the 
private constructor rather than going
@@ -131,7 +132,7 @@ public class KeyRange implements Writable {
             // than an unbound RANGE.
             return lowerInclusive && upperInclusive ? IS_NULL_RANGE : 
EVERYTHING_RANGE;
         }
-        if (lowerRange.length != 0 && upperRange.length != 0) {
+        if ( ( lowerRange.length != 0 || lowerRange == NULL_BOUND ) && ( 
upperRange.length != 0 || upperRange == NULL_BOUND ) ) {
             int cmp = Bytes.compareTo(lowerRange, upperRange);
             if (cmp > 0 || (cmp == 0 && !(lowerInclusive && upperInclusive))) {
                 return EMPTY_RANGE;
@@ -148,12 +149,12 @@ public class KeyRange implements Writable {
         }
         boolean unboundLower = false;
         boolean unboundUpper = false;
-        if (lowerRange.length == 0) {
+        if (lowerRange.length == 0 && lowerRange != NULL_BOUND) {
             lowerRange = UNBOUND;
             lowerInclusive = false;
             unboundLower = true;
         }
-        if (upperRange.length == 0) {
+        if (upperRange.length == 0 && upperRange != NULL_BOUND) {
             upperRange = UNBOUND;
             upperInclusive = false;
             unboundUpper = true;
@@ -575,20 +576,25 @@ public class KeyRange implements Writable {
     }
     
     public KeyRange invert() {
-        byte[] lower = this.getLowerRange();
+        // these special ranges do not get inverted because we
+        // represent NULL in the same way for ASC and DESC.
+        if (this == IS_NOT_NULL_RANGE || this == IS_NULL_RANGE) {
+            return this;
+        }
+        byte[] lowerBound = this.getLowerRange();
         if (!this.lowerUnbound()) {
-            lower = SortOrder.invert(lower, 0, lower.length);
+            lowerBound = SortOrder.invert(lowerBound, 0, lowerBound.length);
         }
-        byte[] upper;
+        byte[] upperBound;
         if (this.isSingleKey()) {
-            upper = lower;
+            upperBound = lowerBound;
         } else {
-            upper = this.getUpperRange();
+            upperBound = this.getUpperRange();
             if (!this.upperUnbound()) {
-                upper = SortOrder.invert(upper, 0, upper.length);
+                upperBound = SortOrder.invert(upperBound, 0, 
upperBound.length);
             }
         }
-        return KeyRange.getKeyRange(lower, this.isLowerInclusive(), upper, 
this.isUpperInclusive());
+        return KeyRange.getKeyRange(upperBound, this.isUpperInclusive(), 
lowerBound, this.isLowerInclusive());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java
index 1a44ce1..3210516 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java
@@ -21,7 +21,9 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 
@@ -355,4 +357,80 @@ public class RowKeySchema extends ValueSchema {
         ptr.set(ptr.get(), initialOffset, finalLength);
         return position + i - (Boolean.FALSE.equals(hasValue) ? 1 : 0);
     }
+
+    public int computeMaxSpan(int pkPos, KeyRange result, 
ImmutableBytesWritable ptr) {
+        int maxOffset = iterator(result.getLowerRange(), ptr);
+        int lowerSpan = 0;
+        int i = pkPos;
+        while (this.next(ptr, i++, maxOffset) != null) {
+            lowerSpan++;
+        }
+        int upperSpan = 0;
+        i = pkPos;
+        maxOffset = iterator(result.getUpperRange(), ptr);
+        while (this.next(ptr, i++, maxOffset) != null) {
+            upperSpan++;
+        }
+        return Math.max(Math.max(lowerSpan, upperSpan), 1);
+    }
+
+    public int computeMinSpan(int pkPos, KeyRange keyRange, 
ImmutableBytesWritable ptr) {
+        if (keyRange == KeyRange.EVERYTHING_RANGE) {
+            return 0;
+        }
+        int lowerSpan = Integer.MAX_VALUE;
+        byte[] range = keyRange.getLowerRange();
+        if (range != KeyRange.UNBOUND) {
+            lowerSpan = 0;
+            int maxOffset = iterator(range, ptr);
+            int i = pkPos;
+            while (this.next(ptr, i++, maxOffset) != null) {
+                lowerSpan++;
+            }
+        }
+        int upperSpan = Integer.MAX_VALUE;
+        range = keyRange.getUpperRange();
+        if (range != KeyRange.UNBOUND) {
+            upperSpan = 0;
+            int maxOffset = iterator(range, ptr);
+            int i = pkPos;
+            while (this.next(ptr, i++, maxOffset) != null) {
+                upperSpan++;
+            }
+        }
+        return Math.min(lowerSpan, upperSpan);
+    }
+
+    /**
+     * Clip the left hand portion of the keyRange up to the spansToClip. If 
keyRange is shorter in
+     * spans than spansToClip, the portion of the range that exists will be 
returned.
+     * @param pkPos the leading pk position of the keyRange.
+     * @param keyRange the key range to clip
+     * @param spansToClip the number of spans to clip
+     * @param ptr an ImmutableBytesWritable to use for temporary storage.
+     * @return the clipped portion of the keyRange
+     */
+    public KeyRange clipLeft(int pkPos, KeyRange keyRange, int spansToClip, 
ImmutableBytesWritable ptr) {
+        if (spansToClip < 0) {
+            throw new IllegalArgumentException("Cannot specify a negative 
spansToClip (" + spansToClip + ")");
+        }
+        if (spansToClip == 0) {
+            return keyRange;
+        }
+        byte[] lowerRange = keyRange.getLowerRange();
+        if (lowerRange != KeyRange.UNBOUND) {
+            ptr.set(lowerRange);
+            this.position(ptr, pkPos, pkPos+spansToClip-1);
+            ptr.set(lowerRange, 0, ptr.getOffset() + ptr.getLength());
+            lowerRange = ByteUtil.copyKeyBytesIfNecessary(ptr);
+        }
+        byte[] upperRange = keyRange.getUpperRange();
+        if (upperRange != KeyRange.UNBOUND) {
+            ptr.set(upperRange);
+            this.position(ptr, pkPos, pkPos+spansToClip-1);
+            ptr.set(upperRange, 0, ptr.getOffset() + ptr.getLength());
+            upperRange = ByteUtil.copyKeyBytesIfNecessary(ptr);
+        }
+        return KeyRange.getKeyRange(lowerRange, true, upperRange, true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 110afc2..0ff17d3 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -4500,7 +4500,7 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
                     "    )\n" + 
                     ") SPLIT ON ('A','C','E','G','I')");
             conn.createStatement().execute("CREATE LOCAL INDEX IDX ON 
T(A,B,D)");
-            String query = "SELECT * FROM T WHERE A = 'C' and (A,B,D) > 
('C','B','X') and D='C'";
+            String query = "SELECT * FROM T WHERE A = 'C' and (A,B,D) > 
('C','B','X') and B < 'Z' and D='C'";
             PhoenixStatement statement = 
conn.createStatement().unwrap(PhoenixStatement.class);
             QueryPlan plan = statement.optimizeQuery(query);
             assertEquals("IDX", 
plan.getContext().getCurrentTable().getTable().getName().getString());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index 56fd178..4d5a424 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -45,9 +45,9 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.DeleteStatement;
 import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
@@ -742,9 +742,8 @@ public class QueryOptimizerTest extends 
BaseConnectionlessQueryTest {
         assertEquals(4 + offset, 
plan.getContext().getScanRanges().getBoundPkColumnCount());
         plan = stmt.compileQuery("select * from my_table_mt_view where pkcol1 
= 'a' and pkcol2 = 'b' and pkcol3 = 'c' and (pkcol1, pkcol2) < ('z', 'z')");
         assertEquals(4 + offset, 
plan.getContext().getScanRanges().getBoundPkColumnCount());
-        // TODO: in theory pkcol2 and pkcol3 could be bound, but we don't have 
the logic for that yet
         plan = stmt.compileQuery("select * from my_table_mt_view where 
(pkcol2, pkcol3) > ('0', '0') and pkcol1 = '000000000000000'");
-        assertEquals(2 + offset, 
plan.getContext().getScanRanges().getBoundPkColumnCount());
+        assertEquals(4 + offset, 
plan.getContext().getScanRanges().getBoundPkColumnCount());
     }
 
     private void assertPlanDetails(PreparedStatement stmt, String 
expectedPkCols, String expectedPkColsDataTypes, boolean expectedHasOrderBy, int 
expectedLimit) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java
index d249a66..3ab6a19 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java
@@ -166,14 +166,14 @@ public class TenantSpecificViewIndexCompileTest extends 
BaseConnectionlessQueryT
         assertOrderByHasBeenOptimizedOut(conn, sql);
 
         // Query with predicate ordered by full row key
-        sql = "SELECT * FROM v1 WHERE k3 < TO_DATE('" + createStaticDate() + 
"') ORDER BY k3 DESC";
-        expectedExplainOutput = "CLIENT PARALLEL 1-WAY RANGE SCAN OVER T 
['tenant123456789','xyz','abcde',~'2015-01-01 07:59:59.999'] - 
['tenant123456789','xyz','abcde',*]";
+        sql = "SELECT * FROM v1 WHERE k3 <= TO_DATE('" + createStaticDate() + 
"') ORDER BY k3 DESC";
+        expectedExplainOutput = "CLIENT PARALLEL 1-WAY RANGE SCAN OVER T 
['tenant123456789','xyz','abcde',~'2015-01-01 08:00:00.000'] - 
['tenant123456789','xyz','abcde',*]";
         assertExplainPlanIsCorrect(conn, sql, expectedExplainOutput);
         assertOrderByHasBeenOptimizedOut(conn, sql);
 
         // Query with predicate ordered by full row key with date in reverse 
order
-        sql = "SELECT * FROM v1 WHERE k3 < TO_DATE('" + createStaticDate() + 
"') ORDER BY k3";
-        expectedExplainOutput = "CLIENT PARALLEL 1-WAY REVERSE RANGE SCAN OVER 
T ['tenant123456789','xyz','abcde',~'2015-01-01 07:59:59.999'] - 
['tenant123456789','xyz','abcde',*]";
+        sql = "SELECT * FROM v1 WHERE k3 <= TO_DATE('" + createStaticDate() + 
"') ORDER BY k3";
+        expectedExplainOutput = "CLIENT PARALLEL 1-WAY REVERSE RANGE SCAN OVER 
T ['tenant123456789','xyz','abcde',~'2015-01-01 08:00:00.000'] - 
['tenant123456789','xyz','abcde',*]";
         assertExplainPlanIsCorrect(conn, sql, expectedExplainOutput);
         assertOrderByHasBeenOptimizedOut(conn, sql);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
index 4b21a89..e5555d6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.compile;
 
+import static org.apache.phoenix.query.KeyRange.EVERYTHING_RANGE;
+import static org.apache.phoenix.query.KeyRange.getKeyRange;
 import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
 import static org.apache.phoenix.util.TestUtil.BINARY_NAME;
 import static org.apache.phoenix.util.TestUtil.BTABLE_NAME;
@@ -47,6 +49,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -54,7 +57,12 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.WhereOptimizer.KeyExpressionVisitor.KeySlots;
+import 
org.apache.phoenix.compile.WhereOptimizer.KeyExpressionVisitor.SingleKeySlot;
+import 
org.apache.phoenix.compile.WhereOptimizer.KeyExpressionVisitor.SlotsIterator;
+import 
org.apache.phoenix.compile.WhereOptimizer.KeyExpressionVisitor.TrailingRangeIterator;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.filter.BooleanExpressionFilter;
 import org.apache.phoenix.filter.RowKeyComparisonFilter;
 import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SingleKeyValueComparisonFilter;
@@ -65,6 +73,7 @@ import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDate;
@@ -81,7 +90,6 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.phoenix.schema.ColumnRef;
 import org.junit.Test;
 
 public class WhereOptimizerTest extends BaseConnectionlessQueryTest {
@@ -109,6 +117,106 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
     }
   
     @Test
+    public void testTrailingRangesIterator() throws Exception {
+        KeyRange[] all = new KeyRange[] 
{EVERYTHING_RANGE,EVERYTHING_RANGE,EVERYTHING_RANGE,EVERYTHING_RANGE,EVERYTHING_RANGE,
 EVERYTHING_RANGE};
+        List<KeyRange[]> singleAll = Collections.singletonList(all);
+        KeyRange[] r1 = new KeyRange[] {
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                getKeyRange(Bytes.toBytes("A")),
+                EVERYTHING_RANGE, EVERYTHING_RANGE};
+        KeyRange[] r2 = new KeyRange[] {
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                getKeyRange(Bytes.toBytes("B")),
+                EVERYTHING_RANGE, EVERYTHING_RANGE};
+        KeyRange[] r3 = new KeyRange[] {
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                getKeyRange(Bytes.toBytes("C")),
+                EVERYTHING_RANGE, EVERYTHING_RANGE};
+        KeyRange[] r4 = new KeyRange[] {
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                getKeyRange(Bytes.toBytes("D")),
+                EVERYTHING_RANGE, EVERYTHING_RANGE};
+        KeyRange[] r5 = new KeyRange[] {
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                EVERYTHING_RANGE,
+                getKeyRange(Bytes.toBytes("A"),true,Bytes.toBytes("D"),true),
+                EVERYTHING_RANGE, EVERYTHING_RANGE};
+        int initPkPos = 1;
+        int pkPos = 3;
+        List<List<List<KeyRange[]>>> slotsTrailingRangesList = 
Lists.<List<List<KeyRange[]>>>newArrayList(
+                
Lists.<List<KeyRange[]>>newArrayList(Lists.<KeyRange[]>newArrayList(r5)),
+                Lists.<List<KeyRange[]>>newArrayList(
+                        Lists.<KeyRange[]>newArrayList(r1, r2),
+                        Lists.<KeyRange[]>newArrayList(r3, r4)
+                        ),
+                Lists.<List<KeyRange[]>>newArrayList(),
+                Lists.<List<KeyRange[]>>newArrayList(singleAll)
+                );
+        List<KeyRange> results = Lists.<KeyRange>newArrayList();
+        List<KeyRange> expectedResults = 
Lists.newArrayList(getKeyRange(Bytes.toBytes("A")),getKeyRange(Bytes.toBytes("B")),getKeyRange(Bytes.toBytes("C")),getKeyRange(Bytes.toBytes("D")));
+        TrailingRangeIterator iterator = new TrailingRangeIterator(initPkPos, 
pkPos, slotsTrailingRangesList);
+        while (iterator.hasNext()) {
+            do {
+                do {
+                    KeyRange range = iterator.getRange();
+                    results.add(range);
+                } while (iterator.nextTrailingRange());
+            } while (iterator.nextRange());
+        }
+        assertEquals(expectedResults, results);
+    }
+    
+    @Test
+    public void testSlotsIterator() throws Exception {
+        List<KeySlots> keySlotsList = Lists.newArrayList();
+        keySlotsList.add(new SingleKeySlot(null, 0, 
+                Lists.<KeyRange>newArrayList(
+                        KeyRange.getKeyRange(Bytes.toBytes("A")),
+                        KeyRange.getKeyRange(Bytes.toBytes("B"))
+                                )));
+        keySlotsList.add(new SingleKeySlot(null, 1, 
+                Lists.<KeyRange>newArrayList(
+                        KeyRange.getKeyRange(Bytes.toBytes("C"))
+                                )));
+        keySlotsList.add(new SingleKeySlot(null, 0, 
+                Lists.<KeyRange>newArrayList(
+                        KeyRange.getKeyRange(Bytes.toBytes("D")),
+                        KeyRange.getKeyRange(Bytes.toBytes("E"))
+                                )));
+        keySlotsList.add(new SingleKeySlot(null, 1, 
+                Lists.<KeyRange>newArrayList()));
+        SlotsIterator iterator = new SlotsIterator(keySlotsList, 0);
+        String[][] expectedResults = {
+                {"A",null,"D",null},
+                {"B",null, "D", null},
+                {"A",null,"E",null},
+                {"B",null,"E",null},
+                };
+        int j = 0;
+        while (iterator.next()) {
+            int i;
+            for (i = 0; i < keySlotsList.size(); i++) {
+                KeyRange range = iterator.getRange(i);
+                String result = range == null ? null : 
Bytes.toString(range.getLowerRange());
+                String expectedResult = expectedResults[j][i];
+                assertEquals(expectedResult,result);
+            }
+            assertEquals(i,expectedResults[j].length);
+            j++;
+        }
+        assertEquals(j, expectedResults.length);
+    }
+    
+    @Test
     public void testMathFunc() throws SQLException {
         Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES));
         conn.createStatement().execute("create table test (id integer primary 
key)");
@@ -1354,20 +1462,54 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
     
     @Test
     public void testRVCExpressionThroughOr() throws SQLException {
-        String tenantId = "000000000000001";
-        String entityId = "002333333333331";
+        String tenantId =  "000000000000001";
+        String entityId =  "002333333333331";
         String entityId1 = "002333333333330";
         String entityId2 = "002333333333332";
         String query = "select * from atable where (organization_id,entity_id) 
>= (?,?) and organization_id = ? and  (entity_id = ? or entity_id = ?)";
         List<Object> binds = Arrays.<Object>asList(tenantId, entityId, 
tenantId, entityId1, entityId2);
         StatementContext context = compileStatement(query, binds);
         Scan scan = context.getScan();
+        byte[] expectedStartRow = 
ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), 
PVarchar.INSTANCE.toBytes(entityId1));
+        byte[] expectedStopRow = 
ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), 
PVarchar.INSTANCE.toBytes(entityId2), QueryConstants.SEPARATOR_BYTE_ARRAY);
+        assertArrayEquals(expectedStartRow, scan.getStartRow());
+        assertArrayEquals(expectedStopRow, scan.getStopRow());
         Filter filter = scan.getFilter();
-        assertNull(filter);
-        byte[] expectedStartRow = 
ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), 
PVarchar.INSTANCE.toBytes(entityId2));
-        byte[] expectedStopRow = 
ByteUtil.concat(ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), 
PVarchar.INSTANCE.toBytes(entityId2)), QueryConstants.SEPARATOR_BYTE_ARRAY);
+        assertTrue(filter instanceof SkipScanFilter);
+        SkipScanFilter skipScanFilter = (SkipScanFilter)filter;
+        List<List<KeyRange>> skipScanRanges = Arrays.asList(
+                
Arrays.asList(KeyRange.getKeyRange(ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId),
 PVarchar.INSTANCE.toBytes(entityId1))),
+                              
KeyRange.getKeyRange(ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), 
PVarchar.INSTANCE.toBytes(entityId2)))));
+        assertEquals(skipScanRanges, skipScanFilter.getSlots());
+    }
+    
+    @Test
+    public void testNotRepresentableBySkipScan() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES));
+        String tableName = generateUniqueName();
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(a 
INTEGER NOT NULL, b INTEGER NOT NULL, CONSTRAINT pk PRIMARY KEY (a,b))");
+        String query = "SELECT * FROM " + tableName + 
+                " WHERE (a,b) >= (1,5) and (a,b) < (3,8) and (a = 1 or a = 3) 
and ((b >= 6 and b < 9) or (b > 3 and b <= 5))";
+        StatementContext context = compileStatement(query);
+        Scan scan = context.getScan();
+        byte[] expectedStartRow = 
ByteUtil.concat(PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(4));
+        byte[] expectedStopRow = ByteUtil.concat(PInteger.INSTANCE.toBytes(3), 
PInteger.INSTANCE.toBytes(9));
         assertArrayEquals(expectedStartRow, scan.getStartRow());
         assertArrayEquals(expectedStopRow, scan.getStopRow());
+        Filter filter = scan.getFilter();
+        assertTrue(filter instanceof FilterList);
+        FilterList filterList = (FilterList)filter;
+        // We can form a skip scan, but it's not exact, so we need the boolean 
expression filter
+        // as well.
+        assertTrue(filterList.getFilters().get(0) instanceof SkipScanFilter);
+        assertTrue(filterList.getFilters().get(1) instanceof 
BooleanExpressionFilter);
+        SkipScanFilter skipScanFilter = 
(SkipScanFilter)filterList.getFilters().get(0);
+        List<List<KeyRange>> skipScanRanges = Arrays.asList(
+                
Arrays.asList(KeyRange.getKeyRange(PInteger.INSTANCE.toBytes(1)),
+                              
KeyRange.getKeyRange(PInteger.INSTANCE.toBytes(3))),
+                
Arrays.asList(KeyRange.getKeyRange(PInteger.INSTANCE.toBytes(4), true, 
PInteger.INSTANCE.toBytes(5), true),
+                              
KeyRange.getKeyRange(PInteger.INSTANCE.toBytes(6), true, 
PInteger.INSTANCE.toBytes(9), false)));
+        assertEquals(skipScanRanges, skipScanFilter.getSlots());
     }
     
     /**
@@ -1685,9 +1827,6 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
     
     @Test
     public void testQueryMoreRVC() throws SQLException {
-        String tenantId = "000000000000001";
-        String parentId = "000000000000008";
-        
         String ddl = "CREATE TABLE rvcTestIdx "
                 + " (\n" + 
                 "    pk1 VARCHAR NOT NULL,\n" + 
@@ -1792,9 +1931,17 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
         StatementContext context = compileStatement(query, binds);
         Scan scan = context.getScan();
         Filter filter = scan.getFilter();
-        assertTrue(filter instanceof RowKeyComparisonFilter);
+        assertTrue(filter instanceof SkipScanFilter);
         assertArrayEquals(HConstants.EMPTY_START_ROW, scan.getStartRow());
         assertArrayEquals(HConstants.EMPTY_END_ROW, scan.getStopRow());
+        SkipScanFilter skipScanFilter = (SkipScanFilter)filter;
+        List<List<KeyRange>> keyRanges = skipScanFilter.getSlots();
+        assertEquals(1, keyRanges.size());
+        assertEquals(2, keyRanges.get(0).size());
+        KeyRange range1 = keyRanges.get(0).get(0);
+        KeyRange range2 = keyRanges.get(0).get(1);
+        assertEquals(KeyRange.getKeyRange(KeyRange.UNBOUND, false, 
Bytes.toBytes(secondTenantId), true), range1);
+        
assertEquals(KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes(firstTenantId), 
Bytes.toBytes(firstParentId)), true, KeyRange.UNBOUND, true), range2);
     }
 
     @Test
@@ -2045,19 +2192,19 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
         String query;
         StatementContext context;        
         Connection conn = DriverManager.getConnection(getUrl());
-        
+
         ddl = "create table t (a integer not null, b integer not null, c 
integer constraint pk primary key (a,b))";
         conn.createStatement().execute(ddl);
         
-        query = "select c from t where (a,b) in ( (1,2) , (1,3) ) and b = 4";
-        context = compileStatement(query, Collections.<Object>emptyList());
-        assertDegenerate(context.getScan());
-        
         query = "select c from t where a in (1,2) and b = 3 and (a,b) in ( 
(1,2) , (1,3))";
         context = compileStatement(query, Collections.<Object>emptyList());
         assertArrayEquals(ByteUtil.concat(PInteger.INSTANCE.toBytes(1), 
PInteger.INSTANCE.toBytes(3)), context.getScan().getStartRow());
         assertArrayEquals(ByteUtil.concat(PInteger.INSTANCE.toBytes(1), 
ByteUtil.nextKey(PInteger.INSTANCE.toBytes(3))), 
context.getScan().getStopRow());
 
+        query = "select c from t where (a,b) in ( (1,2) , (1,3) ) and b = 4";
+        context = compileStatement(query, Collections.<Object>emptyList());
+        assertDegenerate(context.getScan());
+        
         query = "select c from t where a = 1 and b = 3 and (a,b) in ( (1,2) , 
(1,3))";
         context = compileStatement(query, Collections.<Object>emptyList());
         assertArrayEquals(ByteUtil.concat(PInteger.INSTANCE.toBytes(1), 
PInteger.INSTANCE.toBytes(3)), context.getScan().getStartRow());
@@ -2069,8 +2216,10 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
         
         query = "select c from t1 where d = 'a' and e = 'foo' and a in (1,2) 
and b = 3 and (a,b) in ( (1,2) , (1,3))";
         context = compileStatement(query, Collections.<Object>emptyList());
-        assertArrayEquals(ByteUtil.concat(PVarchar.INSTANCE.toBytes("a"), 
QueryConstants.SEPARATOR_BYTE_ARRAY, PChar.INSTANCE.toBytes("foo"), 
PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(3)), 
context.getScan().getStartRow());
-        assertArrayEquals(ByteUtil.concat(PVarchar.INSTANCE.toBytes("a"), 
QueryConstants.SEPARATOR_BYTE_ARRAY, PChar.INSTANCE.toBytes("foo"), 
PInteger.INSTANCE.toBytes(1), ByteUtil.nextKey(PInteger.INSTANCE.toBytes(3))), 
context.getScan().getStopRow());
+        Scan scan = context.getScan();
+        assertArrayEquals(ByteUtil.concat(PVarchar.INSTANCE.toBytes("a"), 
QueryConstants.SEPARATOR_BYTE_ARRAY, PChar.INSTANCE.toBytes("foo"), 
PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(3)), 
scan.getStartRow());
+        assertArrayEquals(ByteUtil.concat(PVarchar.INSTANCE.toBytes("a"), 
QueryConstants.SEPARATOR_BYTE_ARRAY, PChar.INSTANCE.toBytes("foo"), 
PInteger.INSTANCE.toBytes(1), ByteUtil.nextKey(PInteger.INSTANCE.toBytes(3))), 
scan.getStopRow());
+        
         conn.close();
     }
 
@@ -2092,7 +2241,7 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
     }
     
     @Test
-    public void testRVCWithLeadingPKEq() throws SQLException {
+    public void testPartialRVCWithLeadingPKEq() throws SQLException {
         String tenantId = "o1";
         Connection conn = DriverManager.getConnection(getUrl());
         conn.createStatement().execute("CREATE TABLE COMMUNITIES.TEST (\n" + 
@@ -2111,17 +2260,42 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
                 "AND (score, entity_id) > (2.0, '04')\n" + 
                 "ORDER BY score, entity_id";
         Scan scan = compileStatement(query).getScan();
-        assertNotNull(scan.getFilter());
+        assertNull(scan.getFilter());
 
-        // See PHOENIX-3384: Optimize RVC expressions for non leading row key 
columns.
-        // FIXME: We should be able to optimize this better, taking into 
account the
-        // (score, entity_id) > (2.0, '04') to form more of the start/stop row.
-        assertArrayEquals(PVarchar.INSTANCE.toBytes(tenantId), 
scan.getStartRow());
+        byte[] startRow = 
ByteUtil.nextKey(ByteUtil.concat(PChar.INSTANCE.toBytes(tenantId), 
PDouble.INSTANCE.toBytes(2.0), PChar.INSTANCE.toBytes("04")));
+        assertArrayEquals(startRow, scan.getStartRow());
         
assertArrayEquals(ByteUtil.nextKey(PVarchar.INSTANCE.toBytes(tenantId)), 
scan.getStopRow());
     }
 
     @Test
-    public void testRVCWithCompDescRowKey() throws SQLException {
+    public void testPartialRVCWithLeadingPKEqDesc() throws SQLException {
+        String tenantId = "o1";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE COMMUNITIES.TEST (\n" + 
+                "    ORGANIZATION_ID CHAR(2) NOT NULL,\n" + 
+                "    SCORE DOUBLE NOT NULL,\n" + 
+                "    ENTITY_ID CHAR(2) NOT NULL\n" + 
+                "    CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" + 
+                "        ORGANIZATION_ID,\n" + 
+                "        SCORE DESC,\n" + 
+                "        ENTITY_ID DESC\n" + 
+                "    )\n" + 
+                ") VERSIONS=1, MULTI_TENANT=TRUE");
+        String query = "SELECT entity_id, score\n" + 
+                "FROM communities.test\n" + 
+                "WHERE organization_id = '" + tenantId + "'\n" + 
+                "AND (score, entity_id) < (2.0, '04')\n" + 
+                "ORDER BY score DESC, entity_id DESC";
+        Scan scan = compileStatement(query).getScan();
+        assertNull(scan.getFilter());
+
+        byte[] startRow = 
ByteUtil.nextKey(ByteUtil.concat(PChar.INSTANCE.toBytes(tenantId), 
PDouble.INSTANCE.toBytes(2.0, SortOrder.DESC), PChar.INSTANCE.toBytes("04", 
SortOrder.DESC)));
+        assertArrayEquals(startRow, scan.getStartRow());
+        
assertArrayEquals(ByteUtil.nextKey(PVarchar.INSTANCE.toBytes(tenantId)), 
scan.getStopRow());
+    }
+
+    @Test
+    public void testFullRVCWithLeadingPKEqDesc() throws SQLException {
         String tenantId = "o1";
         Connection conn = DriverManager.getConnection(getUrl());
         conn.createStatement().execute("CREATE TABLE COMMUNITIES.TEST (\n" + 
@@ -2142,15 +2316,120 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
         Scan scan = compileStatement(query).getScan();
         assertNull(scan.getFilter());
 
-        // FIXME See PHOENIX-3383: Comparison between descending row keys used 
in RVC is reverse
-        // This should set the startRow, but instead it's setting the stopRow
-        byte[] startRow = PChar.INSTANCE.toBytes(tenantId);
+        // TODO: end to end test that confirms this start row is accurate
+        byte[] startRow = ByteUtil.concat(PChar.INSTANCE.toBytes(tenantId), 
PDouble.INSTANCE.toBytes(2.0, SortOrder.DESC), 
ByteUtil.nextKey(PChar.INSTANCE.toBytes("04", SortOrder.DESC)));
         assertArrayEquals(startRow, scan.getStartRow());
-        byte[] stopRow = ByteUtil.concat(PChar.INSTANCE.toBytes(tenantId), 
PDouble.INSTANCE.toBytes(2.0, SortOrder.DESC), PChar.INSTANCE.toBytes("04", 
SortOrder.DESC));
-        assertArrayEquals(stopRow, scan.getStopRow());
+        
assertArrayEquals(ByteUtil.nextKey(PVarchar.INSTANCE.toBytes(tenantId)), 
scan.getStopRow());
+    }
+    
+    @Test
+    public void testTrimTrailing() throws Exception {
+        try (Connection conn= DriverManager.getConnection(getUrl())) {
+            String sql="CREATE TABLE T("+
+                    "A CHAR(1) NOT NULL,"+
+                    "B CHAR(1) NOT NULL,"+
+                    "C CHAR(1) NOT NULL,"+
+                    "D CHAR(1) NOT NULL,"+
+                    "DATA INTEGER, "+
+                    "CONSTRAINT TEST_PK PRIMARY KEY (A,B,C,D))";
+            conn.createStatement().execute(sql);
+
+            // Will cause trailing part of RVC to (A,B,C) to be trimmed 
allowing us to perform a skip scan
+            sql="select * from T where (A,B,C) >= ('A','A','A') and (A,B,C) < 
('D','D','D') and (B,C) > ('E','E')";
+            QueryPlan queryPlan = TestUtil.getOptimizeQueryPlan(conn, sql);
+            Scan scan = queryPlan.getContext().getScan();
+            assertTrue(scan.getFilter() instanceof SkipScanFilter);
+            List<List<KeyRange>> rowKeyRanges = 
((SkipScanFilter)(scan.getFilter())).getSlots();
+            assertEquals(
+                    Arrays.asList(
+                        Arrays.asList(
+                                
KeyRange.getKeyRange(PChar.INSTANCE.toBytes("A"), true, 
PChar.INSTANCE.toBytes("D"), false)
+                                ),
+                        Arrays.asList(
+                                
KeyRange.getKeyRange(PChar.INSTANCE.toBytes("EE"), false, KeyRange.UNBOUND, 
false)
+                                )
+                            ),
+                     rowKeyRanges
+                    );
+            assertArrayEquals(scan.getStartRow(), 
PChar.INSTANCE.toBytes("AEF"));
+            assertArrayEquals(scan.getStopRow(), PChar.INSTANCE.toBytes("D"));
+            sql="select * from T where (A,B,C) > ('A','A','A') and (A,B,C) <= 
('D','D','D') and (B,C) >= ('E','E')";
+            queryPlan = TestUtil.getOptimizeQueryPlan(conn, sql);
+            scan = queryPlan.getContext().getScan();
+            assertTrue(scan.getFilter() instanceof SkipScanFilter);
+            rowKeyRanges = ((SkipScanFilter)(scan.getFilter())).getSlots();
+            assertEquals(
+                    Arrays.asList(
+                        Arrays.asList(
+                                
KeyRange.getKeyRange(PChar.INSTANCE.toBytes("A"), false, 
PChar.INSTANCE.toBytes("D"), true)
+                                ),
+                        Arrays.asList(
+                                
KeyRange.getKeyRange(PChar.INSTANCE.toBytes("EE"), true, KeyRange.UNBOUND, 
false)
+                                )
+                            ),
+                     rowKeyRanges
+                    );
+            assertArrayEquals(PChar.INSTANCE.toBytes("BEE"), 
scan.getStartRow());
+            assertArrayEquals(PChar.INSTANCE.toBytes("E"), scan.getStopRow());
+        }
+    }
+
+    @Test
+    public void testMultiSlotTrailingIntersect() throws Exception {
+        try (Connection conn= DriverManager.getConnection(getUrl())) {
+            String sql="CREATE TABLE T("+
+                    "A CHAR(1) NOT NULL,"+
+                    "B CHAR(1) NOT NULL,"+
+                    "C CHAR(1) NOT NULL,"+
+                    "D CHAR(1) NOT NULL,"+
+                    "DATA INTEGER, "+
+                    "CONSTRAINT TEST_PK PRIMARY KEY (A,B,C,D))";
+            conn.createStatement().execute(sql);
+
+            sql = "select * from t where (a,b) in 
(('A','B'),('B','A'),('B','B'),('A','A')) and (a,b,c) in ( ('A','B','C') , 
('A','C','D'), ('B','B','E'))";
+            QueryPlan queryPlan = TestUtil.getOptimizeQueryPlan(conn, sql);
+            Scan scan = queryPlan.getContext().getScan();
+            assertTrue(scan.getFilter() instanceof SkipScanFilter);
+            List<List<KeyRange>> rowKeyRanges = 
((SkipScanFilter)(scan.getFilter())).getSlots();
+            assertEquals(
+                    Arrays.asList(
+                        Arrays.asList(
+                                
KeyRange.POINT.apply(PChar.INSTANCE.toBytes("ABC")),
+                                
KeyRange.POINT.apply(PChar.INSTANCE.toBytes("BBE"))
+                                )
+                            ),
+                     rowKeyRanges
+                    );
+            assertArrayEquals(scan.getStartRow(), 
PChar.INSTANCE.toBytes("ABC"));
+            assertArrayEquals(scan.getStopRow(), 
PChar.INSTANCE.toBytes("BBF"));
+        }
     }
 
     @Test
+    public void testEqualityAndGreaterThanRVC() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE T (\n" + 
+                    "    A CHAR(1) NOT NULL,\n" + 
+                    "    B CHAR(1) NOT NULL,\n" + 
+                    "    C CHAR(1) NOT NULL,\n" + 
+                    "    D CHAR(1) NOT NULL,\n" + 
+                    "    CONSTRAINT PK PRIMARY KEY (\n" + 
+                    "        A,\n" + 
+                    "        B,\n" + 
+                    "        C,\n" + 
+                    "        D\n" + 
+                    "    )\n" + 
+                    ")");
+            String query = "SELECT * FROM T WHERE A = 'C' and (A,B,C) > 
('C','B','X') and C='C'";
+            QueryPlan queryPlan = TestUtil.getOptimizeQueryPlan(conn, query);
+            Scan scan = queryPlan.getContext().getScan();
+            assertArrayEquals(ByteUtil.concat(PChar.INSTANCE.toBytes("C"), 
PChar.INSTANCE.toBytes("C"), PChar.INSTANCE.toBytes("C")), scan.getStartRow());
+            assertArrayEquals(PChar.INSTANCE.toBytes("D"), scan.getStopRow());
+        }
+    }
+    
+    @Test
     public void testOrExpressionNonLeadingPKPushToScanBug4602() throws 
Exception {
         Connection conn = null;
         try {
@@ -2163,7 +2442,7 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
                     "DATA INTEGER, "+
                     "CONSTRAINT TEST_PK PRIMARY KEY (PK1,PK2,PK3))";
             conn.createStatement().execute(sql);
-
+            
             //case 1: pk1 is equal,pk2 is multiRange
             sql="select * from "+testTableName+" t where (t.pk1 = 2) and 
((t.pk2 >= 4 and t.pk2 <6) or (t.pk2 >= 8 and t.pk2 <9))";
             QueryPlan queryPlan = TestUtil.getOptimizeQueryPlan(conn, sql);
@@ -2302,8 +2581,8 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
             sql ="select * from "+testTableName+" t where (t.pk1 >=2 and 
t.pk1<5) or (t.pk2 >=7 or t.pk2 <9)";
             queryPlan= TestUtil.getOptimizeQueryPlan(conn, sql);
 
-            Expression pk2Expression =  new ColumnRef(queryPlan.getTableRef(), 
queryPlan.getTableRef().getTable().getColumnForColumnName("PK2").getPosition()).newColumnExpression();
             scan = queryPlan.getContext().getScan();
+            Expression pk2Expression =  new ColumnRef(queryPlan.getTableRef(), 
queryPlan.getTableRef().getTable().getColumnForColumnName("PK2").getPosition()).newColumnExpression();
             assertTrue(scan.getFilter() instanceof RowKeyComparisonFilter);
             assertEquals(
                       TestUtil.rowKeyFilter(
@@ -2372,6 +2651,24 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
                      scan.getFilter());
             assertArrayEquals(scan.getStartRow(), HConstants.EMPTY_START_ROW);
             assertArrayEquals(scan.getStopRow(), HConstants.EMPTY_END_ROW);
+            
+            //case 11: pk1 and pk2, but pk1 has a or allRange and force skip 
scan
+            sql="select /*+ SKIP_SCAN */ * from "+testTableName+" t where 
((t.pk1 >=2 and t.pk1<5) or (t.pk1 >=7 or t.pk1 <9)) and ((t.pk2 >= 4 and t.pk2 
<6) or (t.pk2 >= 8 and t.pk2 <9))";
+            queryPlan = TestUtil.getOptimizeQueryPlan(conn, sql);
+            scan = queryPlan.getContext().getScan();
+            assertTrue(scan.getFilter() instanceof SkipScanFilter);
+            rowKeyRanges = ((SkipScanFilter)(scan.getFilter())).getSlots();
+            assertEquals(
+                    Arrays.asList(
+                            Arrays.asList(KeyRange.EVERYTHING_RANGE),
+                            Arrays.asList(
+                                    
KeyRange.getKeyRange(PInteger.INSTANCE.toBytes(4), true, 
PInteger.INSTANCE.toBytes(6), false),
+                                    
KeyRange.getKeyRange(PInteger.INSTANCE.toBytes(8), true, 
PInteger.INSTANCE.toBytes(9), false)
+                                    )
+                            ),
+                    rowKeyRanges);
+            assertArrayEquals(scan.getStartRow(), HConstants.EMPTY_START_ROW);
+            assertArrayEquals(scan.getStopRow(), HConstants.EMPTY_END_ROW);
         }
         finally {
             if(conn!=null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/test/java/org/apache/phoenix/expression/RoundFloorCeilExpressionsTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/expression/RoundFloorCeilExpressionsTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/expression/RoundFloorCeilExpressionsTest.java
index 89058ba..3562099 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/expression/RoundFloorCeilExpressionsTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/expression/RoundFloorCeilExpressionsTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.expression;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -24,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.math.BigDecimal;
 import java.sql.Date;
+import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -42,14 +44,20 @@ import 
org.apache.phoenix.expression.function.RoundDecimalExpression;
 import org.apache.phoenix.expression.function.ScalarFunction;
 import org.apache.phoenix.expression.function.TimeUnit;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDate;
 import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 
 /**
@@ -60,7 +68,7 @@ import org.junit.Test;
  *
  * @since 3.0.0
  */
-public class RoundFloorCeilExpressionsTest {
+public class RoundFloorCeilExpressionsTest extends BaseConnectionlessQueryTest 
{
 
     // Decimal Expression Tests
 
@@ -165,37 +173,40 @@ public class RoundFloorCeilExpressionsTest {
 
     @Test
     public void testRoundDecimalExpressionKeyRangeSimple() throws Exception {
+        KeyPart baseKeyPart = getDecimalKeyPart();
         ScalarFunction roundDecimalExpression = 
(ScalarFunction)RoundDecimalExpression.create(DUMMY_DECIMAL, 3);
 
         byte[] upperBound = PDecimal.INSTANCE.toBytes(new 
BigDecimal("1.2385"));
         byte[] lowerBound = PDecimal.INSTANCE.toBytes(new 
BigDecimal("1.2375"));
         KeyRange expectedKeyRange = KeyRange.getKeyRange(lowerBound, 
upperBound);
 
-        KeyPart keyPart = roundDecimalExpression.newKeyPart(null);
+        KeyPart keyPart = roundDecimalExpression.newKeyPart(baseKeyPart);
         assertEquals(expectedKeyRange, keyPart.getKeyRange(CompareOp.EQUAL, 
LiteralExpression.newConstant(new BigDecimal("1.238"), PDecimal.INSTANCE)));
     }
 
     @Test
     public void testFloorDecimalExpressionKeyRangeSimple() throws Exception {
+        KeyPart baseKeyPart = getDecimalKeyPart();
         ScalarFunction floorDecimalExpression = 
(ScalarFunction)FloorDecimalExpression.create(DUMMY_DECIMAL, 3);
 
         byte[] upperBound = PDecimal.INSTANCE.toBytes(new BigDecimal("1.239"));
         byte[] lowerBound = PDecimal.INSTANCE.toBytes(new BigDecimal("1.238"));
         KeyRange expectedKeyRange = KeyRange.getKeyRange(lowerBound, true, 
upperBound, false);
 
-        KeyPart keyPart = floorDecimalExpression.newKeyPart(null);
+        KeyPart keyPart = floorDecimalExpression.newKeyPart(baseKeyPart);
         assertEquals(expectedKeyRange, keyPart.getKeyRange(CompareOp.EQUAL, 
LiteralExpression.newConstant(new BigDecimal("1.238"), PDecimal.INSTANCE)));
     }
 
     @Test
     public void testCeilDecimalExpressionKeyRangeSimple() throws Exception {
+        KeyPart baseKeyPart = getDecimalKeyPart();
         ScalarFunction ceilDecimalExpression = 
(ScalarFunction)CeilDecimalExpression.create(DUMMY_DECIMAL, 3);
 
         byte[] upperBound = PDecimal.INSTANCE.toBytes(new BigDecimal("1.238"));
         byte[] lowerBound = PDecimal.INSTANCE.toBytes(new BigDecimal("1.237"));
         KeyRange expectedKeyRange = KeyRange.getKeyRange(lowerBound, false, 
upperBound, true);
 
-        KeyPart keyPart = ceilDecimalExpression.newKeyPart(null);
+        KeyPart keyPart = ceilDecimalExpression.newKeyPart(baseKeyPart);
         assertEquals(expectedKeyRange, keyPart.getKeyRange(CompareOp.EQUAL, 
LiteralExpression.newConstant(new BigDecimal("1.238"), PDecimal.INSTANCE)));
     }
 
@@ -203,27 +214,61 @@ public class RoundFloorCeilExpressionsTest {
 
     @Test
     public void testRoundDecimalExpressionKeyRangeCoverage() throws Exception {
+        KeyPart baseKeyPart = getDecimalKeyPart();
         for(int scale : SCALES) {
             ScalarFunction roundDecimalExpression = (ScalarFunction) 
RoundDecimalExpression.create(DUMMY_DECIMAL, scale);
-            KeyPart keyPart = roundDecimalExpression.newKeyPart(null);
+            KeyPart keyPart = roundDecimalExpression.newKeyPart(baseKeyPart);
             verifyKeyPart(RoundingType.ROUND, scale, keyPart);
         }
     }
 
+    private static KeyPart getDecimalKeyPart() throws SQLException {
+        String tableName = generateUniqueName();
+        try (PhoenixConnection pconn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class)) {
+            pconn.createStatement().execute("CREATE TABLE " + tableName + " (k 
DECIMAL PRIMARY KEY)");
+            final PTable table = pconn.getMetaDataCache().getTableRef(new 
PTableKey(null, tableName)).getTable();
+            KeyPart baseKeyPart = new KeyPart() {
+    
+                @Override
+                public KeyRange getKeyRange(CompareOp op, Expression rhs) {
+                    return KeyRange.EVERYTHING_RANGE;
+                }
+    
+                @Override
+                public List<Expression> getExtractNodes() {
+                    return Collections.emptyList();
+                }
+    
+                @Override
+                public PColumn getColumn() {
+                    return table.getPKColumns().get(0);
+                }
+    
+                @Override
+                public PTable getTable() {
+                    return table;
+                }
+            };
+            return baseKeyPart;
+        }
+    }
+    
     @Test
     public void testFloorDecimalExpressionKeyRangeCoverage() throws Exception {
+        KeyPart baseKeyPart = getDecimalKeyPart();
         for(int scale : SCALES) {
             ScalarFunction floorDecimalExpression = (ScalarFunction) 
FloorDecimalExpression.create(DUMMY_DECIMAL, scale);
-            KeyPart keyPart = floorDecimalExpression.newKeyPart(null);
+            KeyPart keyPart = floorDecimalExpression.newKeyPart(baseKeyPart);
             verifyKeyPart(RoundingType.FLOOR, scale, keyPart);
         }
     }
 
     @Test
     public void testCeilDecimalExpressionKeyRangeCoverage() throws Exception {
+        KeyPart baseKeyPart = getDecimalKeyPart();
         for(int scale : SCALES) {
             ScalarFunction ceilDecimalExpression = (ScalarFunction) 
CeilDecimalExpression.create(DUMMY_DECIMAL, scale);
-            KeyPart keyPart = ceilDecimalExpression.newKeyPart(null);
+            KeyPart keyPart = ceilDecimalExpression.newKeyPart(baseKeyPart);
             verifyKeyPart(RoundingType.CEIL, scale, keyPart);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
index abc435a..a3da784 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
@@ -104,7 +104,7 @@ public class KeyRangeClipTest extends 
BaseConnectionlessQueryTest {
     
     @Test
     public void test() {
-        ScanRanges scanRanges = ScanRanges.create(schema, 
Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(input)),
 new int[] {schema.getFieldCount()-1}, KeyRange.EVERYTHING_RANGE, null, false, 
-1);
+        ScanRanges scanRanges = ScanRanges.create(schema, 
Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(input)),
 new int[] {schema.getFieldCount()-1}, null, false, -1);
         ScanRanges clippedRange = 
BaseResultIterators.computePrefixScanRanges(scanRanges, clipTo);
         assertEquals(expectedOutput, clippedRange.getScanRange());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
index 787ca33..d87989c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
@@ -36,6 +36,9 @@ public class QueryPlanTest extends 
BaseConnectionlessQueryTest {
     public void testExplainPlan() throws Exception {
         String[] queryPlans = new String[] {
 
+                "SELECT a_string,b_string FROM atable WHERE organization_id = 
'000000000000001' AND entity_id > '000000000000002' AND entity_id < 
'000000000000008' AND (organization_id,entity_id) <= 
('000000000000001','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE 
['000000000000001','000000000000003'] - ['000000000000001','000000000000005']",
+
                 "SELECT host FROM PTSDB WHERE inst IS NULL AND host IS NOT 
NULL AND \"DATE\" >= to_date('2013-01-01')",
                 "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [null,not 
null]\n" + 
                 "    SERVER FILTER BY FIRST KEY ONLY AND \"DATE\" >= DATE 
'2013-01-01 00:00:00.000'",
@@ -64,11 +67,8 @@ public class QueryPlanTest extends 
BaseConnectionlessQueryTest {
                 "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER ATABLE\n" + 
                 "    SERVER FILTER BY (X_INTEGER = 2 AND A_INTEGER < 5)",
 
-                "SELECT a_string,b_string FROM atable WHERE organization_id = 
'000000000000001' AND entity_id > '000000000000002' AND entity_id < 
'000000000000008' AND (organization_id,entity_id) <= 
('000000000000001','000000000000005') ",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE 
['000000000000001','000000000000003'] - ['000000000000001','000000000000006']",
-
                 "SELECT a_string,b_string FROM atable WHERE organization_id > 
'000000000000001' AND entity_id > '000000000000002' AND entity_id < 
'000000000000008' AND (organization_id,entity_id) >= 
('000000000000003','000000000000005') ",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE 
['000000000000003','000000000000005'] - [*]\n" + 
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE 
['000000000000003000000000000005'] - [*]\n" + 
                 "    SERVER FILTER BY (ENTITY_ID > '000000000000002' AND 
ENTITY_ID < '000000000000008')",
 
                 "SELECT a_string,b_string FROM atable WHERE organization_id = 
'000000000000001' AND entity_id >= '000000000000002' AND entity_id < 
'000000000000008' AND (organization_id,entity_id) >= 
('000000000000000','000000000000005') ",

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aee568be/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
index a435ba6..35278a7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
@@ -25,15 +25,19 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Test;
@@ -148,4 +152,48 @@ public class RowKeySchemaTest  extends 
BaseConnectionlessQueryTest  {
         assertExpectedRowKeyValue("c1 VARCHAR, c2 CHAR(1) NOT NULL, c3 INTEGER 
NOT NULL", "c1, c2, c3", new Object[] {"abc", "z", 5});
     }
     
+    private static byte[] getKeyPart(PTable t, String... keys) throws 
SQLException {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        byte[][] keyByteArray = new byte[keys.length][];
+        int i = 0;
+        for (String key : keys) {
+            keyByteArray[i++] = key == null ? ByteUtil.EMPTY_BYTE_ARRAY : 
Bytes.toBytes(key);
+        }
+        t.newKey(ptr, keyByteArray);
+        return ptr.copyBytes();
+    }
+    
+    @Test
+    public void testClipLeft() throws Exception {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE T1(K1 CHAR(1) NOT NULL, 
K2 VARCHAR, K3 VARCHAR, CONSTRAINT pk PRIMARY KEY (K1,K2,K3))  ");
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        PTable table;
+        RowKeySchema schema;
+        table = pconn.getTable(new PTableKey(pconn.getTenantId(), "T1"));
+        schema = table.getRowKeySchema();
+        KeyRange r, rLeft, expectedResult;
+        r = KeyRange.getKeyRange(getKeyPart(table, "A", "B", "C"), true, 
getKeyPart(table, "B", "C"), true);
+        rLeft = schema.clipLeft(0, r, 1, ptr);
+        expectedResult = KeyRange.getKeyRange(getKeyPart(table, "A"), true, 
getKeyPart(table, "B"), true);
+        r = KeyRange.getKeyRange(getKeyPart(table, "A", "B", "C"), true, 
getKeyPart(table, "B"), true);
+        rLeft = schema.clipLeft(0, r, 1, ptr);
+        expectedResult = KeyRange.getKeyRange(getKeyPart(table, "A"), true, 
getKeyPart(table, "B"), true);
+        assertEquals(expectedResult, rLeft);
+        rLeft = schema.clipLeft(0, r, 2, ptr);
+        expectedResult = KeyRange.getKeyRange(getKeyPart(table, "A", "B"), 
true, getKeyPart(table, "B"), true);
+        assertEquals(expectedResult, rLeft);
+        
+        r = KeyRange.getKeyRange(getKeyPart(table, "A", "B", "C"), true, 
KeyRange.UNBOUND, true);
+        rLeft = schema.clipLeft(0, r, 2, ptr);
+        expectedResult = KeyRange.getKeyRange(getKeyPart(table, "A", "B"), 
true, KeyRange.UNBOUND, false);
+        assertEquals(expectedResult, rLeft);
+        
+        r = KeyRange.getKeyRange(KeyRange.UNBOUND, false, getKeyPart(table, 
"A", "B", "C"), true);
+        rLeft = schema.clipLeft(0, r, 2, ptr);
+        expectedResult = KeyRange.getKeyRange(KeyRange.UNBOUND, false, 
getKeyPart(table, "A", "B"), true);
+        assertEquals(expectedResult, rLeft);
+    }
+    
 }

Reply via email to