http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc383eb6/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 87f00e4..a5287cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.compile;
 
-import static java.util.Collections.singletonList;
-
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -27,10 +25,14 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.BaseExpression;
 import 
org.apache.phoenix.expression.BaseExpression.ExpressionComparabilityWrapper;
@@ -61,7 +63,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
@@ -74,8 +75,11 @@ import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 
 /**
  *
@@ -115,6 +119,7 @@ public class WhereOptimizer {
        RowKeySchema schema = table.getRowKeySchema();
        boolean isMultiTenant = tenantId != null && table.isMultiTenant();
        boolean isSharedIndex = table.getViewIndexId() != null;
+       ImmutableBytesWritable ptr = context.getTempPtr();
        
        if (isMultiTenant) {
             tenantIdBytes = ScanUtil.getTenantIdBytes(schema, isSalted, 
tenantId, isSharedIndex);
@@ -158,31 +163,13 @@ public class WhereOptimizer {
 
         int pkPos = 0;
         int nPKColumns = table.getPKColumns().size();
-        int[] slotSpan = new int[nPKColumns];
+        int[] slotSpanArray = new int[nPKColumns];
         List<List<KeyRange>> cnf = 
Lists.newArrayListWithExpectedSize(schema.getMaxFields());
-        KeyRange minMaxRange = keySlots.getMinMaxRange();
-        if (minMaxRange == null) {
-            minMaxRange = KeyRange.EVERYTHING_RANGE;
-        }
-        boolean hasMinMaxRange = (minMaxRange != KeyRange.EVERYTHING_RANGE);
-        int minMaxRangeOffset = 0;
-        byte[] minMaxRangePrefix = null;
         boolean hasViewIndex = table.getViewIndexId() != null;
-        if (hasMinMaxRange) {
-            int minMaxRangeSize = (isSalted ? SaltingUtil.NUM_SALTING_BYTES : 
0)
-                    + (isMultiTenant ? tenantIdBytes.length + 1 : 0)
-                    + (hasViewIndex ? 
MetaDataUtil.getViewIndexIdDataType().getByteSize() : 0);
-            minMaxRangePrefix = new byte[minMaxRangeSize];
-        }
-        
-        Iterator<KeyExpressionVisitor.KeySlot> iterator = keySlots.iterator();
+        Iterator<KeyExpressionVisitor.KeySlot> iterator = 
keySlots.getSlots().iterator();
         // Add placeholder for salt byte ranges
         if (isSalted) {
             cnf.add(SALT_PLACEHOLDER);
-            if (hasMinMaxRange) {
-                   System.arraycopy(SALT_PLACEHOLDER.get(0).getLowerRange(), 
0, minMaxRangePrefix, minMaxRangeOffset, SaltingUtil.NUM_SALTING_BYTES);
-                   minMaxRangeOffset += SaltingUtil.NUM_SALTING_BYTES;
-            }
             // Increment the pkPos, as the salt column is in the row schema
             // Do not increment the iterator, though, as there will never be
             // an expression in the keySlots for the salt column
@@ -194,35 +181,17 @@ public class WhereOptimizer {
         if (hasViewIndex) {
             byte[] viewIndexBytes = 
MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
             KeyRange indexIdKeyRange = KeyRange.getKeyRange(viewIndexBytes);
-            cnf.add(singletonList(indexIdKeyRange));
-            if (hasMinMaxRange) {
-                System.arraycopy(viewIndexBytes, 0, minMaxRangePrefix, 
minMaxRangeOffset, viewIndexBytes.length);
-                minMaxRangeOffset += viewIndexBytes.length;
-            }
+            cnf.add(Collections.singletonList(indexIdKeyRange));
             pkPos++;
         }
         
         // Add tenant data isolation for tenant-specific tables
         if (isMultiTenant) {
             KeyRange tenantIdKeyRange = KeyRange.getKeyRange(tenantIdBytes);
-            cnf.add(singletonList(tenantIdKeyRange));
-            if (hasMinMaxRange) {
-                System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, 
minMaxRangeOffset, tenantIdBytes.length);
-                minMaxRangeOffset += tenantIdBytes.length;
-                Field f = schema.getField(pkPos);
-                if (!f.getDataType().isFixedWidth()) {
-                    minMaxRangePrefix[minMaxRangeOffset] = 
SchemaUtil.getSeparatorByte(schema.rowKeyOrderOptimizable(), 
tenantIdBytes.length==0, f);
-                    minMaxRangeOffset++;
-                }
-            }
+            cnf.add(Collections.singletonList(tenantIdKeyRange));
             pkPos++;
         }
         
-        // Prepend minMaxRange with fixed column values so we can properly 
intersect the
-        // range with the other range.
-        if (hasMinMaxRange) {
-            minMaxRange = minMaxRange.prependRange(minMaxRangePrefix, 0, 
minMaxRangeOffset);
-        }
         boolean forcedSkipScan = statement.getHint().hasHint(Hint.SKIP_SCAN);
         boolean forcedRangeScan = statement.getHint().hasHint(Hint.RANGE_SCAN);
         boolean hasUnboundedRange = false;
@@ -251,13 +220,44 @@ public class WhereOptimizer {
                 }
             }
             KeyPart keyPart = slot.getKeyPart();
-            slotSpan[cnf.size()] = slot.getPKSpan() - 1;
-            pkPos = slot.getPKPosition() + slot.getPKSpan();
-            // Skip span-1 slots as we skip one at the top of the loop
-            for (int i = 1; i < slot.getPKSpan() && iterator.hasNext(); i++) {
-                iterator.next();
-            }
             List<KeyRange> keyRanges = slot.getKeyRanges();
+            SortOrder prevSortOrder = null;
+            int slotOffset = 0;
+            int clipLeftSpan = 0;
+            
+            // Iterate through all spans of this slot
+            while (true) {
+                SortOrder sortOrder =  schema.getField(slot.getPKPosition() + 
slotOffset).getSortOrder();
+                if (prevSortOrder == null)  {
+                    prevSortOrder = sortOrder;
+                } else if (prevSortOrder != sortOrder) {
+                    // If the sort order changes, we must clip the portion 
with the same sort order
+                    // and invert the key ranges and swap the upper and lower 
bounds.
+                    List<KeyRange> leftRanges = clipLeft(schema, 
slot.getPKPosition() + slotOffset - clipLeftSpan, clipLeftSpan, keyRanges, ptr);
+                    keyRanges = clipRight(schema, slot.getPKPosition() + 
slotOffset - 1, keyRanges, leftRanges, ptr);
+                    if (prevSortOrder == SortOrder.DESC) {
+                        leftRanges = invertKeyRanges(leftRanges);
+                    }
+                    slotSpanArray[cnf.size()] = clipLeftSpan-1;
+                    cnf.add(leftRanges);
+                    clipLeftSpan = 0;
+                    prevSortOrder = sortOrder;
+                }
+                clipLeftSpan++;
+                slotOffset++;
+                if (slotOffset >= slot.getPKSpan()) {
+                    break;
+                }
+                if (iterator.hasNext()) {
+                    iterator.next();
+                }
+            }
+            if (schema.getField(slot.getPKPosition() + slotOffset - 
1).getSortOrder() == SortOrder.DESC) {
+                keyRanges = invertKeyRanges(keyRanges);
+            }
+            pkPos = slot.getPKPosition() + slot.getPKSpan();
+            
+            slotSpanArray[cnf.size()] = clipLeftSpan-1;
             cnf.add(keyRanges);
             
             // TODO: when stats are available, we may want to use a skip scan 
if the
@@ -301,8 +301,8 @@ public class WhereOptimizer {
         // If we have fully qualified point keys with multi-column spans (i.e. 
RVC),
         // we can still use our skip scan. The ScanRanges.create() call will 
explode
         // out the keys.
-        slotSpan = Arrays.copyOf(slotSpan, cnf.size());
-        ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpan, 
minMaxRange, nBuckets, useSkipScan, table.getRowTimestampColPos());
+        slotSpanArray = Arrays.copyOf(slotSpanArray, cnf.size());
+        ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpanArray, 
nBuckets, useSkipScan, table.getRowTimestampColPos());
         context.setScanRanges(scanRanges);
         if (whereClause == null) {
             return null;
@@ -311,6 +311,61 @@ public class WhereOptimizer {
         }
     }
     
+    private static KeyRange getTrailingRange(RowKeySchema rowKeySchema, int 
pkPos, KeyRange range, KeyRange clippedResult, ImmutableBytesWritable ptr) {
+        int separatorLength = 
rowKeySchema.getField(pkPos).getDataType().isFixedWidth() ? 0 : 1;
+        byte[] lowerRange = KeyRange.UNBOUND;
+        boolean lowerInclusive = false;
+        // Lower range of trailing part of RVC must be true, so we can form a 
new range to intersect going forward
+        if (!range.lowerUnbound() && Bytes.startsWith(range.getLowerRange(), 
clippedResult.getLowerRange())) {
+            lowerRange = range.getLowerRange();
+            int offset = clippedResult.getLowerRange().length + 
separatorLength;
+            ptr.set(lowerRange, offset, lowerRange.length - offset);
+            lowerRange = ptr.copyBytes();
+            lowerInclusive = range.isLowerInclusive();
+        }
+        byte[] upperRange = KeyRange.UNBOUND;
+        boolean upperInclusive = false;
+        if (!range.upperUnbound() && Bytes.startsWith(range.getUpperRange(), 
clippedResult.getUpperRange())) {
+            upperRange = range.getUpperRange();
+            int offset = clippedResult.getUpperRange().length + 
separatorLength;
+            ptr.set(upperRange, offset, upperRange.length - offset);
+            upperRange = ptr.copyBytes();
+            upperInclusive = range.isUpperInclusive();
+        }
+        return KeyRange.getKeyRange(lowerRange, lowerInclusive, upperRange, 
upperInclusive);
+    }
+
+    private static List<KeyRange> clipRight(RowKeySchema schema, int pkPos, 
List<KeyRange> keyRanges,
+            List<KeyRange> leftRanges, ImmutableBytesWritable ptr) {
+        List<KeyRange> clippedKeyRanges = 
Lists.newArrayListWithExpectedSize(keyRanges.size());
+        for (int i = 0; i < leftRanges.size(); i++) {
+            KeyRange leftRange = leftRanges.get(i);
+            KeyRange range = keyRanges.get(i);
+            KeyRange clippedKeyRange = getTrailingRange(schema, pkPos, range, 
leftRange, ptr);
+            clippedKeyRanges.add(clippedKeyRange);
+        }
+        return clippedKeyRanges;
+    }
+
+    private static List<KeyRange> clipLeft(RowKeySchema schema, int pkPos, int 
clipLeftSpan, List<KeyRange> keyRanges, ImmutableBytesWritable ptr) {
+        List<KeyRange> clippedKeyRanges = 
Lists.newArrayListWithExpectedSize(keyRanges.size());
+        for (KeyRange keyRange : keyRanges) {
+            KeyRange clippedKeyRange = schema.clipLeft(pkPos, keyRange, 
clipLeftSpan, ptr);
+            clippedKeyRanges.add(clippedKeyRange);
+        }
+        return clippedKeyRanges;
+    }
+
+    private static List<KeyRange> invertKeyRanges(List<KeyRange> keyRanges) {
+        keyRanges = new ArrayList<KeyRange>(keyRanges);
+        for (int i = 0; i < keyRanges.size(); i++) {
+            KeyRange range = keyRanges.get(i);
+            range = range.invert();
+            keyRanges.set(i, range);
+        }
+        return keyRanges;
+    }
+
     /**
      * Get an optimal combination of key expressions for hash join key range 
optimization.
      * @return returns true if the entire combined expression is covered by 
key range optimization
@@ -330,7 +385,7 @@ public class WhereOptimizer {
             KeyExpressionVisitor.KeySlots keySlots = 
expression.accept(visitor);
             int minPkPos = Integer.MAX_VALUE; 
             if (keySlots != null) {
-                Iterator<KeyExpressionVisitor.KeySlot> iterator = 
keySlots.iterator();
+                Iterator<KeyExpressionVisitor.KeySlot> iterator = 
keySlots.getSlots().iterator();
                 while (iterator.hasNext()) {
                     KeyExpressionVisitor.KeySlot slot = iterator.next();
                     if (slot.getPKPosition() < minPkPos) {
@@ -457,18 +512,13 @@ public class WhereOptimizer {
     public static class KeyExpressionVisitor extends 
StatelessTraverseNoExpressionVisitor<KeyExpressionVisitor.KeySlots> {
         private static final KeySlots EMPTY_KEY_SLOTS = new KeySlots() {
             @Override
-            public Iterator<KeySlot> iterator() {
-                return Collections.emptyIterator();
-            }
-
-            @Override
-            public KeyRange getMinMaxRange() {
-                return null;
+            public boolean isPartialExtraction() {
+                return false;
             }
 
             @Override
-            public boolean isPartialExtraction() {
-                return false;
+            public List<KeySlot> getSlots() {
+                return Collections.emptyList();
             }
         };
 
@@ -481,22 +531,11 @@ public class WhereOptimizer {
                 return EMPTY_KEY_SLOTS;
             }
             
-            List<KeyRange> keyRanges = slot.getPKSpan() == 1 ? 
Collections.<KeyRange>singletonList(keyRange) : EVERYTHING_RANGES;
-            KeyRange minMaxRange = null;
-            if (slot.getPKSpan() > 1) {
-                int initPosition = (table.getBucketNum() ==null ? 0 : 1) + 
(this.context.getConnection().getTenantId() != null && table.isMultiTenant() ? 
1 : 0) + (table.getViewIndexId() == null ? 0 : 1);
-                if (initPosition == slot.getPKPosition()) {
-                    minMaxRange = keyRange;
-                } else {
-                    // If we cannot set the minMaxRange, then we must not 
extract the expression since
-                    // we wouldn't be constraining the range at all based on 
it.
-                    extractNode = null;
-                }
-            }
-            return newKeyParts(slot, extractNode, keyRanges, minMaxRange);
+            List<KeyRange> keyRanges = 
Collections.<KeyRange>singletonList(keyRange);
+            return newKeyParts(slot, extractNode, keyRanges);
         }
 
-        private KeySlots newKeyParts(KeySlot slot, Expression extractNode, 
List<KeyRange> keyRanges, KeyRange minMaxRange) {
+        private KeySlots newKeyParts(KeySlot slot, Expression extractNode, 
List<KeyRange> keyRanges) {
             if (isDegenerate(keyRanges)) {
                 return EMPTY_KEY_SLOTS;
             }
@@ -504,15 +543,15 @@ public class WhereOptimizer {
             List<Expression> extractNodes = extractNode == null || 
slot.getKeyPart().getExtractNodes().isEmpty()
                   ? Collections.<Expression>emptyList()
                   : Collections.<Expression>singletonList(extractNode);
-            return new SingleKeySlot(new BaseKeyPart(table, 
slot.getKeyPart().getColumn(), extractNodes), slot.getPKPosition(), 
slot.getPKSpan(), keyRanges, minMaxRange, slot.getOrderPreserving());
+            return new SingleKeySlot(new BaseKeyPart(table, 
slot.getKeyPart().getColumn(), extractNodes), slot.getPKPosition(), 
slot.getPKSpan(), keyRanges, slot.getOrderPreserving());
         }
 
-        private KeySlots newKeyParts(KeySlot slot, List<Expression> 
extractNodes, List<KeyRange> keyRanges, KeyRange minMaxRange) {
+        private KeySlots newKeyParts(KeySlot slot, List<Expression> 
extractNodes, List<KeyRange> keyRanges) {
             if (isDegenerate(keyRanges)) {
                 return EMPTY_KEY_SLOTS;
             }
             
-            return new SingleKeySlot(new BaseKeyPart(table, 
slot.getKeyPart().getColumn(), extractNodes), slot.getPKPosition(), 
slot.getPKSpan(), keyRanges, minMaxRange, slot.getOrderPreserving());
+            return new SingleKeySlot(new BaseKeyPart(table, 
slot.getKeyPart().getColumn(), extractNodes), slot.getPKPosition(), 
slot.getPKSpan(), keyRanges, slot.getOrderPreserving());
         }
 
         private KeySlots 
newRowValueConstructorKeyParts(RowValueConstructorExpression rvc, 
List<KeySlots> childSlots) {
@@ -524,7 +563,7 @@ public class WhereOptimizer {
             int initialPosition = -1;
             for (int i = 0; i < childSlots.size(); i++) {
                 KeySlots slots = childSlots.get(i);
-                KeySlot keySlot = slots.iterator().next();
+                KeySlot keySlot = slots.getSlots().iterator().next();
                 List<Expression> childExtractNodes = 
keySlot.getKeyPart().getExtractNodes();
                 // Stop if there was a gap in extraction of RVC elements. This 
is required if the leading
                 // RVC has not row key columns, as we'll still get childSlots 
if the RVC has trailing row
@@ -594,17 +633,18 @@ public class WhereOptimizer {
                         // For the actual type of the coerceBytes call, we use 
the node type instead of the rhs type, because
                         // for IN, the rhs type will be VARBINARY and no 
coerce will be done in that case (and we need it to
                         // be done).
-                        node.getChild().getDataType().coerceBytes(ptr, 
node.getDataType(), rhs.getSortOrder(), node.getChild().getSortOrder());
+                        node.getChild().getDataType().coerceBytes(ptr, 
node.getDataType(), rhs.getSortOrder(), SortOrder.ASC);
                         lower = ByteUtil.copyKeyBytesIfNecessary(ptr);
                     }
                     byte[] upper = range.getUpperRange();
                     if (!range.upperUnbound()) {
                         ptr.set(upper);
                         // Do the reverse translation so we can optimize out 
the coerce expression
-                        node.getChild().getDataType().coerceBytes(ptr, 
node.getDataType(), rhs.getSortOrder(), node.getChild().getSortOrder());
+                        node.getChild().getDataType().coerceBytes(ptr, 
node.getDataType(), rhs.getSortOrder(), SortOrder.ASC);
                         upper = ByteUtil.copyKeyBytesIfNecessary(ptr);
                     }
-                    return KeyRange.getKeyRange(lower, 
range.isLowerInclusive(), upper, range.isUpperInclusive());
+                    range = KeyRange.getKeyRange(lower, 
range.isLowerInclusive(), upper, range.isUpperInclusive());
+                    return range;
                 }
 
                 @Override
@@ -624,94 +664,739 @@ public class WhereOptimizer {
             }, slot.getPKPosition(), slot.getKeyRanges());
         }
 
-        private static boolean intersectSlots(KeySlot[] slotArray, KeySlot 
childSlot) {
-            int childPosition = childSlot.getPKPosition();
-            int childSpan = childSlot.getPKSpan();
-            boolean filled = false;
-            for (KeySlot slot : slotArray) {
-                if (slot != null) {
-                    int position = slot.getPKPosition();
-                    int span = slot.getPKSpan();
-                    if (childPosition + childSpan > position && childPosition 
< position + span) {
-                        if (childPosition < position || (childPosition == 
position && childSpan > span)) {
-                            slotArray[childPosition] = childSlot = 
childSlot.intersect(slot);
-                            if (childSlot == null) {
-                                return false;
-                            }
-                            filled = true;
-                        } else {
-                            slotArray[position] = slot = 
slot.intersect(childSlot);
-                            if (slot == null) {
+        /**
+         * 
+         * Iterates through all combinations of KeyRanges for a given
+         * PK column (based on its slot position). Useful when expressions
+         * are ORed together and subsequently ANDed. For example:
+         *     WHERE (pk1 = 1 OR pk1 = 2) AND (pk2 = 3 OR pk2 = 4)
+         * would iterate through and produce [1,3],[1,4],[2,3],[2,4].
+         * 
+         */
+        static class SlotsIterator {
+            public final int pkPos;
+            private List<KeySlots> childSlots;
+            private List<SlotRangesIterator> slotRangesIterator;
+            private boolean firstCall = true;
+            
+            SlotsIterator(List<KeySlots> childSlots, int pkPos) {
+                this.childSlots = childSlots;
+                this.pkPos = pkPos;
+                this.slotRangesIterator = 
Lists.newArrayListWithExpectedSize(childSlots.size() * 3 / 2);
+                for (int i = 0; i < childSlots.size(); i++) {
+                    SlotRangesIterator iterator = new SlotRangesIterator(i);
+                    slotRangesIterator.add(iterator);
+                    iterator.initialize();
+                }
+            }
+            
+            public KeySlot getSlot(int index) {
+                SlotRangesIterator slotRanges = slotRangesIterator.get(index);
+                return slotRanges.getSlot();
+            }
+            
+            public KeyRange getRange(int index) {
+                SlotRangesIterator slotRanges = slotRangesIterator.get(index);
+                return slotRanges.getRange();
+            }
+            
+            public boolean next() {
+                if (firstCall) {
+                    boolean hasAny = false;
+                    for (int i = 0; i < childSlots.size(); i++) {
+                        hasAny |= this.slotRangesIterator.get(i).initialize();
+                    }
+                    firstCall = false;
+                    return hasAny;
+                }
+                int i = 0;
+                while (i < childSlots.size() && 
!slotRangesIterator.get(i).next()) {
+                    i++;
+                }
+                for (i = 0; i < childSlots.size(); i++) {
+                    if (!this.slotRangesIterator.get(i).isWrapped()) {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            
+            private class SlotRangesIterator {
+                public int slotIndex;
+                public int rangeIndex;
+                public final KeySlots slots;
+                public boolean wrapped;
+                
+                public SlotRangesIterator(int slotsIndex) {
+                    this.slots = childSlots.get(slotsIndex);
+                }
+                
+                public boolean isWrapped() {
+                    return wrapped || !hasAny();
+                }
+                
+                private boolean initialize() {
+                    slotIndex = 0;
+                    rangeIndex = 0;
+                    while (slotIndex < slots.getSlots().size() 
+                            && (slots.getSlots().get(slotIndex) == null
+                                || 
slots.getSlots().get(slotIndex).getKeyRanges().isEmpty()
+                                || 
slots.getSlots().get(slotIndex).getPKPosition() != pkPos)) {
+                        slotIndex++;
+                    }
+                    return hasAny();
+                }
+                
+                private boolean hasAny() {
+                    return slotIndex < slots.getSlots().size();
+                }
+                
+                public KeySlot getSlot() {
+                    if (!hasAny()) return null;
+                    return slots.getSlots().get(slotIndex);
+                }
+                
+                public KeyRange getRange() {
+                    if (!hasAny()) return null;
+                    return getSlot().getKeyRanges().get(rangeIndex);
+                }
+                
+                public boolean next() {
+                    if (!hasAny()) {
+                        return false;
+                    }
+                    List<KeyRange> ranges = getSlot().getKeyRanges();
+                    if ((rangeIndex = (rangeIndex + 1) % ranges.size()) == 0) {
+                        do {
+                            if (((slotIndex = (slotIndex + 1) % 
slots.getSlots().size()) == 0)) {
+                                initialize();
+                                wrapped = true;
                                 return false;
                             }
-                            filled = true;
-                        }
+                        } while (getSlot() == null || 
getSlot().getKeyRanges().isEmpty() || getSlot().getPKPosition() != pkPos);
                     }
+                
+                    return true;
                 }
             }
-            if (!filled) {
-                slotArray[childPosition] = childSlot;
-            }
-            return true;
         }
         
+        /**
+         * Ands together an arbitrary set of compiled expressions (represented 
as a list of KeySlots)
+         * by intersecting each unique combination among the childSlots.
+         * @param andExpression expressions being anded together
+         * @param childSlots compiled form of child expressions being anded 
together.
+         * @return
+         */
         private KeySlots andKeySlots(AndExpression andExpression, 
List<KeySlots> childSlots) {
             if(childSlots.isEmpty()) {
                 return null;
             }
-            int nColumns = table.getPKColumns().size();
-            KeySlot[] keySlot = new KeySlot[nColumns];
-            KeyRange minMaxRange = KeyRange.EVERYTHING_RANGE;
-            List<Expression> minMaxExtractNodes = 
Lists.<Expression>newArrayList();
-            int initPosition = (table.getBucketNum() ==null ? 0 : 1) + 
(this.context.getConnection().getTenantId() != null && table.isMultiTenant() ? 
1 : 0) + (table.getViewIndexId() == null ? 0 : 1);
+            // Exit early if it's already been determined that one of the 
child slots cannot
+            // possibly be true.
             boolean partialExtraction = andExpression.getChildren().size() != 
childSlots.size();
-            for (KeySlots childSlot : childSlots) {
+            int nChildSlots = childSlots.size();
+            for (int i = 0; i < nChildSlots; i++) {
+                KeySlots childSlot = childSlots.get(i);
                 if (childSlot == EMPTY_KEY_SLOTS) {
                     return EMPTY_KEY_SLOTS;
                 }
+                // If any child slots represent partially extracted 
expressions, then carry
+                // that forward. An example of a partially extracted 
expression would be a
+                // RVC of (K1, K2, NonK3) in which only leading PK columns are 
extracted
+                // from the RVC.
                 partialExtraction |= childSlot.isPartialExtraction();
-                // FIXME: get rid of this special-cased min/max range now that 
a key range can span multiple columns
-                if (childSlot.getMinMaxRange() != null) { // Only set if in 
initial pk position
-                    // TODO: fix intersectSlots so that it works with RVCs. 
We'd just need to fill in the leading parts
-                    // of the key with the minMaxRange and then intersect the 
key parts that overlap.
-                    minMaxRange = 
minMaxRange.intersect(childSlot.getMinMaxRange());
-                    for (KeySlot slot : childSlot) {
+            }
+            boolean mayExtractNodes = true;
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            RowKeySchema rowKeySchema = table.getRowKeySchema();
+            int nPkColumns = table.getPKColumns().size();
+            KeySlot[] keySlotArray = new KeySlot[nPkColumns];
+            int initPkPos = (table.getBucketNum() ==null ? 0 : 1) + 
(this.context.getConnection().getTenantId() != null && table.isMultiTenant() ? 
1 : 0) + (table.getViewIndexId() == null ? 0 : 1);
+            
+            List<List<List<KeyRange[]>>> slotsTrailingRanges = 
Lists.newArrayListWithExpectedSize(nPkColumns);
+            // Process all columns being ANDed in position order to guarantee
+            // we have all information for leading PK columns before we attempt
+            // to intersect them. For example:
+            //     (A, B, C) >= (1, 2, 3) AND (B, C) < (4, 5) AND  A = 1
+            // will processing slot 0 (i.e PK column A) across all children 
first,
+            // followed by slot 1 (PK column B), and finally slot 2 (C). This 
is
+            // done because we carry forward any constraints from preceding PK
+            // columns which may impact following PK columns. In the above 
example
+            // we'd carry forward that (B,C) >= (2,3) since we know that A is 
1.
+            for (int pkPos = initPkPos; pkPos < nPkColumns; pkPos++) {
+                SlotsIterator iterator = new SlotsIterator(childSlots, pkPos);
+                OrderPreserving orderPreserving = null;
+                List<Expression> extractNodes = Lists.newArrayList();
+                List<KeyRange> keyRanges = Lists.newArrayList();
+                // This is the information carried forward as we process in PK 
order.
+                // It's parallel with the list of keyRanges.
+                List<KeyRange[]> trailingRangesList = 
Lists.<KeyRange[]>newArrayList();
+                KeyRange result = null;
+                TrailingRangeIterator trailingRangeIterator = new 
TrailingRangeIterator(initPkPos, pkPos, slotsTrailingRanges);
+                // Iterate through all combinations (i.e. constraints) for the 
PK slot being processed.
+                // For example, with (A = 1 OR A = 2) AND (A,B) > (1,2) AND C 
= 3, we'd process the
+                // following two combinations:
+                //     A=1,(A,B) > (1,2)
+                //     A=2,(A,B) > (1,2)
+                // If we have no constraints for a PK, then we still must 
iterate through the information
+                // that may have been rolled up based on the processing of 
previous PK slots. For example,
+                // in the above ANDed expressions, we have no constraint on B, 
but we would end up with
+                // rolled up information based on the B part of the (A,B) 
constraint.
+                while (iterator.next() || (trailingRangeIterator.hasNext() && 
result != KeyRange.EMPTY_RANGE)) {
+                    result = null;
+                    KeyRange[] trailingRanges = newTrailingRange();
+                    for (int i = 0; i < nChildSlots && result != 
KeyRange.EMPTY_RANGE; i++) {
+                        KeySlot slot = iterator.getSlot(i);
+                        // Rollup the order preserving and concatenate the 
extracted expressions.
+                        // Extracted expressions end up being removed from the 
AND expression at
+                        // the top level call (pushKeyExpressionsToScan) with 
anything remaining
+                        // ending up as a Filter (rather than contributing to 
the start/stop row
+                        // of the scan.
                         if (slot != null) {
-                            // We can only definitely extract the expression 
nodes that start from the
-                            // leading PK column. They may get extracted at 
the end if we end up having
-                            // expressions matching the leading PK columns, 
but otherwise we'll be forced
-                            // to execute the expression in a filter.
-                            if (slot.getPKPosition() == initPosition) {
-                                
minMaxExtractNodes.addAll(slot.getKeyPart().getExtractNodes());
-                            } else {
-                                if (!intersectSlots(keySlot, slot)) {
-                                    return EMPTY_KEY_SLOTS;
+                            KeyRange otherRange = iterator.getRange(i);
+                            KeyRange range = result;
+                            if (slot.getOrderPreserving() != null) {
+                                orderPreserving = 
slot.getOrderPreserving().combine(orderPreserving);
+                            }
+                            if (slot.getKeyPart().getExtractNodes() != null) {
+                                
extractNodes.addAll(slot.getKeyPart().getExtractNodes());
+                            }
+                            // Keep a running intersection of the ranges we 
see. Note that the
+                            // ranges are derived from constants appearing on 
the RHS of a comparison
+                            // expression. For example, the expression A > 5 
would produce a keyRange
+                            // of (5, *) for slot 0 (assuming A is the leading 
PK column) If the result
+                            // ends up as an empty key, that combination is 
ruled out. This is essentially
+                            // doing constant reduction.
+                            result = intersectRanges(pkPos, range, otherRange, 
trailingRanges);
+                        }
+                    }
+                    
+                    if (result != KeyRange.EMPTY_RANGE) {
+                        Map<KeyRange,List<KeyRange[]>> results = 
Maps.newHashMap();
+                        trailingRangeIterator.init();
+                        // Process all constraints that have been rolled up 
from previous
+                        // processing of PK slots. This occurs for RVCs which 
span PK slots
+                        // in which the leading part of the RVC is determined 
to be equal
+                        // to a constant on the RHS.
+                        while (trailingRangeIterator.hasNext()) {
+                            // Loop through all combinations of values for all 
previously
+                            // calculated slots.
+                            do {
+                                // Loop through all combinations of range 
constraints for the
+                                // current combinations of values. If no valid 
combinations
+                                // are found, we can rule out the result. We 
can also end up
+                                // modifying the result if it has an 
intersection with the
+                                // range constraints.
+                                do {
+                                    KeyRange priorTrailingRange = 
trailingRangeIterator.getRange();
+                                    if (priorTrailingRange != 
KeyRange.EVERYTHING_RANGE) {
+                                        KeyRange[] intTrailingRanges = 
Arrays.copyOf(trailingRanges, trailingRanges.length);
+                                        // Intersect the current result with 
each range constraint. We essentially
+                                        // rule out the result when we find a 
constraint that has no intersection
+                                        KeyRange intResult = 
intersectRanges(pkPos, result, priorTrailingRange, intTrailingRanges);
+                                        if (intResult != KeyRange.EMPTY_RANGE) 
{
+                                            addResult(intResult, 
intTrailingRanges, results);
+                                        }
+                                    }
+                                } while 
(trailingRangeIterator.nextTrailingRange());
+                            } while (trailingRangeIterator.nextRange());
+                        }
+                        if (results.isEmpty() && result != null) { // No 
trailing range constraints
+                            keyRanges.add(result);
+                            trailingRangesList.add(trailingRanges);
+                        } else {
+                            mayExtractNodes &= results.size() <= 1;
+                            for (Map.Entry<KeyRange,List<KeyRange[]>> entry : 
results.entrySet()) {
+                                // Add same KeyRange with each KeyRange[] 
since the two lists are parallel
+                                for (KeyRange[] trailingRange : 
entry.getValue()) {
+                                    keyRanges.add(entry.getKey());
+                                    trailingRangesList.add(trailingRange);
                                 }
                             }
                         }
                     }
+                }
+
+                if (result == null && keyRanges.isEmpty()) {
+                    
slotsTrailingRanges.add(Collections.<List<KeyRange[]>>emptyList());
+                } else {
+                    // If we encountered a result for this slot and
+                    // there are no ranges, this is the degenerate case.
+                    if (keyRanges.isEmpty()) {
+                        return EMPTY_KEY_SLOTS;
+                    }
+                    // Similar to KeyRange.coalesce(), except we must combine 
together
+                    // any rolled up constraints (as a list of KeyRanges) for a
+                    // particular value (as they're coalesced together). We 
maintain
+                    // these KeyRange constraints as a parallel list between 
keyRanges
+                    // and trailingRangesList.
+                    keyRanges = coalesceKeyRangesAndTrailingRanges(keyRanges, 
trailingRangesList, slotsTrailingRanges);
+                    int maxSpan = 1;
+                    for (KeyRange aRange : keyRanges) {
+                        int span = rowKeySchema.computeMaxSpan(pkPos, aRange, 
context.getTempPtr());
+                        if (span > maxSpan) {
+                            maxSpan = span;
+                        }
+                    }
+                    keySlotArray[pkPos] = new KeySlot(
+                            new BaseKeyPart(table, 
table.getPKColumns().get(pkPos), mayExtractNodes ? extractNodes : 
Collections.<Expression>emptyList()),
+                            pkPos,
+                            maxSpan,
+                            keyRanges,
+                            orderPreserving);
+                }
+            }
+            
+            // Filters trailing part of RVC based on ranges from PK columns 
after the one we're
+            // currently processing that may overlap with this range. For 
example, with a PK
+            // columns A,B,C and a range of A from [(1,2,3) - (4,5,6)] and B 
from (6-*), we
+            // can filter the trailing part of the RVC for A, because the 
trailing part of
+            // the RVC (2,3)-(5,6) does not intersect with (6-*). By removing 
the trailing
+            // part of the RVC, we end up with a range of A from [1-4] and B 
from (6-*) which
+            // enables us to use a skip scan.
+            for (int i = 0; i < keySlotArray.length; i++) {
+                KeySlot keySlot = keySlotArray[i];
+                if (keySlot == null) continue;
+                int pkSpan = keySlot.getPKSpan();
+                int pkPos = keySlot.getPKPosition();
+                boolean slotWasIntersected = false;
+                List<KeyRange> keyRanges = keySlot.getKeyRanges();
+                List<KeyRange> slotTrimmedResults = 
Lists.newArrayListWithExpectedSize(keyRanges.size());
+                for (KeyRange result : keyRanges) {
+                    boolean resultWasIntersected = false;
+                    Set<KeyRange> trimmedResults = 
Sets.newHashSetWithExpectedSize(keyRanges.size());
+                    for (int trailingPkPos = pkPos+1; trailingPkPos < 
pkPos+pkSpan && trailingPkPos < nPkColumns; trailingPkPos++) {
+                        KeySlot nextKeySlot = keySlotArray[trailingPkPos];
+                        if (nextKeySlot == null) continue;
+                        for (KeyRange trailingRange : 
nextKeySlot.getKeyRanges()) {
+                            resultWasIntersected = true;
+                            KeyRange intResult = intersectTrailing(result, 
pkPos, trailingRange, trailingPkPos);
+                            if (intResult != KeyRange.EMPTY_RANGE) {
+                                trimmedResults.add(intResult);
+                            }
+                        }
+                    }
+                    if (resultWasIntersected) {
+                        slotWasIntersected = true;
+                        slotTrimmedResults.addAll(trimmedResults);
+                        mayExtractNodes &= trimmedResults.size() <= 1;
+                    } else {
+                        slotTrimmedResults.add(result);
+                    }
+                }
+                if (slotTrimmedResults.isEmpty()) {
+                    return EMPTY_KEY_SLOTS;
+                }
+                if (slotWasIntersected) {
+                    // Re-coalesce the ranges and recalc the max span since 
the ranges may have changed
+                    slotTrimmedResults = KeyRange.coalesce(slotTrimmedResults);
+                    pkSpan = 1;
+                    for (KeyRange trimmedResult : slotTrimmedResults) {
+                        pkSpan = Math.max(pkSpan, 
rowKeySchema.computeMaxSpan(pkPos, trimmedResult, ptr));
+                    }
+                }
+                List<Expression> extractNodes = mayExtractNodes ? 
+                        keySlotArray[pkPos].getKeyPart().getExtractNodes() : 
Collections.<Expression>emptyList();
+                keySlotArray[pkPos] = new KeySlot(
+                        new BaseKeyPart(table, 
table.getPKColumns().get(pkPos), extractNodes),
+                        pkPos,
+                        pkSpan,
+                        slotTrimmedResults,
+                        keySlotArray[pkPos].getOrderPreserving());
+            }
+            List<KeySlot> keySlots = Arrays.asList(keySlotArray);
+            // If we have a salt column, skip that slot because
+            // they'll never be an expression that uses it directly.
+            keySlots = keySlots.subList(initPkPos, keySlots.size());
+            return new MultiKeySlot(keySlots, partialExtraction);
+        }
+
+        private KeyRange[] newTrailingRange() {
+            KeyRange[] trailingRanges = new 
KeyRange[table.getPKColumns().size()];
+            for (int i = 0; i < trailingRanges.length; i++) {
+                trailingRanges[i] = KeyRange.EVERYTHING_RANGE;
+            }
+            return trailingRanges;
+        }
+        
+        private static void addResult(KeyRange result, KeyRange[] 
trailingRange, Map<KeyRange,List<KeyRange[]>> results) {
+            List<KeyRange[]> trailingRanges = 
Lists.<KeyRange[]>newArrayList(trailingRange);
+            List<KeyRange[]> priorTrailingRanges = results.put(result, 
trailingRanges);
+            if (priorTrailingRanges != null) {
+                // This is tricky case. We may have multiple possible values 
based on the rolled up range
+                // constraints from previous slots. We track unique ranges and 
concatenate together the
+                // trailing range data. If there's more than one element in 
the set (i.e. more than one
+                // possible result), we'll end up have more combinations than 
there actually are because
+                // the constraint only apply for a single value, not for *all* 
combinations (which is a
+                // limitation of our representation derived from what can be 
handled by our SkipScanFilter).
+                // For example, if we we've gathered these ranges so far in a 
three PK table: (1,2), (A,B)
+                // and have X as a constraint for value A and Y as a 
constraint for value B, we have the 
+                // following possible combinations: 1AX, 2AX, 1BY, 2BY. 
However, our SkipScanFilter only
+                // supports identifying combinations for *all* combinations of 
(1,2),(A,B),(X,Y) or 
+                // AX, 1AY, 1BX, 1BY, 2AX, 2AY, 2BX, 2BY. See 
WhereOptimizerTest.testNotRepresentableBySkipScan()
+                // for an example.
+                trailingRanges.addAll(priorTrailingRanges);
+            }
+        }
+
+        private List<KeyRange> 
coalesceKeyRangesAndTrailingRanges(List<KeyRange> keyRanges,
+                List<KeyRange[]> trailingRangesList, 
List<List<List<KeyRange[]>>> slotsTrailingRanges) {
+            List<Pair<KeyRange,List<KeyRange[]>>> pairs = coalesce(keyRanges, 
trailingRangesList);
+            List<List<KeyRange[]>> trailingRanges = 
Lists.newArrayListWithExpectedSize(pairs.size());
+            List<KeyRange>coalescedKeyRanges = 
Lists.newArrayListWithExpectedSize(pairs.size());
+            for (Pair<KeyRange,List<KeyRange[]>> pair : pairs) {
+                coalescedKeyRanges.add(pair.getFirst());
+                trailingRanges.add(pair.getSecond());
+            }
+            slotsTrailingRanges.add(trailingRanges);
+            return coalescedKeyRanges;
+        }
+
+        public static final Comparator<Pair<KeyRange,List<KeyRange[]>>> 
KEY_RANGE_PAIR_COMPARATOR = new Comparator<Pair<KeyRange,List<KeyRange[]>>>() {
+            @Override public int compare(Pair<KeyRange,List<KeyRange[]>> o1, 
Pair<KeyRange,List<KeyRange[]>> o2) {
+                return KeyRange.COMPARATOR.compare(o1.getFirst(), 
o2.getFirst());
+            }
+        };
+        
+        private static boolean isEverythingRanges(KeyRange[] ranges) {
+            for (KeyRange range : ranges) {
+                if (range != KeyRange.EVERYTHING_RANGE) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        
+        private static List<KeyRange[]> concat(List<KeyRange[]> list1, 
List<KeyRange[]> list2) {
+            if (list1.size() == 1 && isEverythingRanges(list1.get(0))) {
+                if (list2.size() == 1 && isEverythingRanges(list1.get(0))) {
+                    return Collections.emptyList();
+                }
+                return list2;
+            }
+            if (list2.size() == 1 && isEverythingRanges(list2.get(0))) {
+                return list1;
+            }
+            
+            List<KeyRange[]> newList = 
Lists.<KeyRange[]>newArrayListWithExpectedSize(list1.size()+list2.size());
+            newList.addAll(list1);
+            newList.addAll(list2);
+            return newList;
+        }
+        
+        /**
+         * Similar to KeyRange.coelesce, but con
+         */
+        @NonNull
+        public static List<Pair<KeyRange,List<KeyRange[]>>> 
coalesce(List<KeyRange> keyRanges, List<KeyRange[]> trailingRangesList) {
+            List<Pair<KeyRange,List<KeyRange[]>>> tmp = 
Lists.newArrayListWithExpectedSize(keyRanges.size());
+            int nKeyRanges = keyRanges.size();
+            for (int i = 0; i < nKeyRanges; i++) {
+                KeyRange keyRange = keyRanges.get(i);
+                KeyRange[] trailingRange = trailingRangesList.get(i);
+                Pair<KeyRange,List<KeyRange[]>> pair = new 
Pair<KeyRange,List<KeyRange[]>>(keyRange,Lists.<KeyRange[]>newArrayList(trailingRange));
+                tmp.add(pair);
+            }
+            Collections.sort(tmp, KEY_RANGE_PAIR_COMPARATOR);
+            List<Pair<KeyRange,List<KeyRange[]>>> tmp2 = 
Lists.<Pair<KeyRange,List<KeyRange[]>>>newArrayListWithExpectedSize(tmp.size());
+            Pair<KeyRange,List<KeyRange[]>> range = tmp.get(0);
+            for (int i=1; i<tmp.size(); i++) {
+                Pair<KeyRange,List<KeyRange[]>> otherRange = tmp.get(i);
+                KeyRange intersect = 
range.getFirst().intersect(otherRange.getFirst());
+                if (KeyRange.EMPTY_RANGE == intersect) {
+                    tmp2.add(range);
+                    range = otherRange;
+                } else {
+                    KeyRange newRange = 
range.getFirst().union(otherRange.getFirst());
+                    range = new 
Pair<KeyRange,List<KeyRange[]>>(newRange,concat(range.getSecond(),otherRange.getSecond()));
+                }
+            }
+            tmp2.add(range);
+            List<Pair<KeyRange,List<KeyRange[]>>> tmp3 = 
Lists.<Pair<KeyRange,List<KeyRange[]>>>newArrayListWithExpectedSize(tmp2.size());
+            range = tmp2.get(0);
+            for (int i=1; i<tmp2.size(); i++) {
+                Pair<KeyRange,List<KeyRange[]>> otherRange = tmp2.get(i);
+                assert !range.getFirst().upperUnbound();
+                assert !otherRange.getFirst().lowerUnbound();
+                if (range.getFirst().isUpperInclusive() != 
otherRange.getFirst().isLowerInclusive()
+                        && Bytes.equals(range.getFirst().getUpperRange(), 
otherRange.getFirst().getLowerRange())) {
+                    KeyRange newRange = KeyRange.getKeyRange(
+                            range.getFirst().getLowerRange(), 
range.getFirst().isLowerInclusive(),
+                            otherRange.getFirst().getUpperRange(), 
otherRange.getFirst().isUpperInclusive());
+                    range = new 
Pair<KeyRange,List<KeyRange[]>>(newRange,concat(range.getSecond(),otherRange.getSecond()));
                 } else {
-                    for (KeySlot slot : childSlot) {
-                        // The slot will be null if we have no condition for 
this slot
-                        if (slot != null && !intersectSlots(keySlot, slot)) {
-                            return EMPTY_KEY_SLOTS;
+                    tmp3.add(range);
+                    range = otherRange;
+                }
+            }
+            tmp3.add(range);
+            
+            return tmp3;
+        }
+        
+        /**
+         * 
+         * Iterates over all unique combinations of the List<KeyRange[]> 
representing
+         * the constraints from previous slot positions. For example, if we 
have
+         * a RVC of (A,B) = (2,1), then if A=2, we know that B must be 1.
+         *
+         */
+        static class TrailingRangeIterator {
+            private final List<List<List<KeyRange[]>>> slotTrailingRangesList;
+            private final int[] rangePos;
+            private final int[] trailingRangePos;
+            private final int initPkPos;
+            private final int pkPos;
+            private int trailingRangePosIndex;
+            private int rangePosIndex;
+            private boolean hasMore = true;
+            
+            TrailingRangeIterator (int initPkPos, int pkPos, 
List<List<List<KeyRange[]>>> slotsTrailingRangesList) {
+                this.slotTrailingRangesList = slotsTrailingRangesList;
+                int nSlots = pkPos - initPkPos;
+                rangePos = new int[nSlots];
+                trailingRangePos = new int[nSlots];
+                this.initPkPos = initPkPos;
+                this.pkPos = pkPos;
+                init();
+            }
+            
+            public void init() {
+                Arrays.fill(rangePos, 0);
+                Arrays.fill(trailingRangePos, 0);
+                rangePosIndex = rangePos.length - 1;
+                trailingRangePosIndex = trailingRangePos.length - 1;
+                this.hasMore = pkPos > initPkPos && skipEmpty();
+            }
+
+            public boolean hasNext() {
+                return hasMore && skipEmpty();
+            }
+            
+            public KeyRange getRange() {
+                if (!hasMore) {
+                    throw new NoSuchElementException();
+                }
+                KeyRange priorTrailingRange = KeyRange.EVERYTHING_RANGE;
+                for (int priorPkPos = initPkPos; priorPkPos < pkPos; 
priorPkPos++) {
+                    List<List<KeyRange[]>>trailingKeyRangesList = 
slotTrailingRangesList.get(priorPkPos-initPkPos);
+                    if (!trailingKeyRangesList.isEmpty()) {
+                        List<KeyRange[]> slotTrailingRanges = 
trailingKeyRangesList.get(rangePos[priorPkPos-initPkPos]);
+                        if (!slotTrailingRanges.isEmpty()) {
+                            KeyRange[] slotTrailingRange = 
slotTrailingRanges.get(trailingRangePos[priorPkPos-initPkPos]);
+                            priorTrailingRange = 
priorTrailingRange.intersect(slotTrailingRange[pkPos]);
                         }
                     }
                 }
+                
+                return priorTrailingRange;
+            }
+            
+            private boolean skipEmptyTrailingRanges() {
+                while (trailingRangePosIndex >= 0 && 
+                        
(slotTrailingRangesList.get(trailingRangePosIndex).isEmpty() 
+                                || 
slotTrailingRangesList.get(trailingRangePosIndex).get(rangePos[trailingRangePosIndex]).isEmpty()))
 {
+                    trailingRangePosIndex--;
+                }
+                if (trailingRangePosIndex >= 0) {
+                    return true;
+                }
+                return false;
+           }
+            
+            private boolean skipEmptyRanges() {
+                trailingRangePosIndex = trailingRangePos.length - 1;
+                while (rangePosIndex >= 0 && 
+                        (slotTrailingRangesList.get(rangePosIndex).isEmpty())) 
{
+                    rangePosIndex--;
+                }
+                return rangePosIndex >= 0;
+            }
+            
+            private boolean skipEmpty() {
+                if (!hasMore || slotTrailingRangesList.isEmpty() || 
rangePosIndex < 0) {
+                    return hasMore=false;
+                }
+                do {
+                    if (skipEmptyTrailingRanges()) {
+                        return true;
+                    }
+                } while (skipEmptyRanges());
+                return hasMore = rangePosIndex >= 0;
+            }
+            
+            public boolean nextRange() {
+                trailingRangePosIndex = trailingRangePos.length - 1;
+                while (rangePosIndex >= 0 && 
+                        (slotTrailingRangesList.get(rangePosIndex).isEmpty() 
+                                 || (rangePos[rangePosIndex] = 
(rangePos[rangePosIndex] + 1) 
+                                    % 
slotTrailingRangesList.get(rangePosIndex).size()) == 0)) {
+                    rangePosIndex--;
+                }
+                return rangePosIndex >= 0;
             }
 
-            if (!minMaxExtractNodes.isEmpty()) {
-                if (keySlot[initPosition] == null) {
-                    keySlot[initPosition] = new KeySlot(new BaseKeyPart(table, 
table.getPKColumns().get(initPosition), minMaxExtractNodes), initPosition, 1, 
EVERYTHING_RANGES, null);
+            public boolean nextTrailingRange() {
+                while (trailingRangePosIndex >= 0 && 
+                        
(slotTrailingRangesList.get(trailingRangePosIndex).isEmpty() 
+                                || 
slotTrailingRangesList.get(trailingRangePosIndex).get(rangePos[trailingRangePosIndex]).isEmpty()
 
+                                || (trailingRangePos[trailingRangePosIndex] = 
(trailingRangePos[trailingRangePosIndex] + 1) 
+                                    % 
slotTrailingRangesList.get(trailingRangePosIndex).get(rangePos[trailingRangePosIndex]).size())
 == 0)) {
+                    trailingRangePosIndex--;
+                }
+                if (trailingRangePosIndex >= 0) {
+                    return true;
+                }
+                return false;
+            }
+        }
+        
+        private KeyRange intersectRanges(int pkPos, KeyRange range, KeyRange 
otherRange, KeyRange[] trailingRanges) {
+            // We need to initialize result to the other range rather than
+            // initializing it to EVERYTHING_RANGE to handle the IS NULL case.
+            // Otherwise EVERYTHING_RANGE intersected below with NULL_RANGE
+            // becomes an EMPTY_RANGE.
+            if (range == null) {
+                range = otherRange;
+            }
+            KeyRange result = range;
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            RowKeySchema rowKeySchema = table.getRowKeySchema();
+            int minSpan = rowKeySchema.computeMinSpan(pkPos, result, ptr);
+            int otherMinSpan = rowKeySchema.computeMinSpan(pkPos, otherRange, 
ptr);
+            KeyRange otherClippedRange = otherRange;
+            KeyRange clippedRange = result;
+            if (minSpan != otherMinSpan && result != KeyRange.EVERYTHING_RANGE 
&& otherRange != KeyRange.EVERYTHING_RANGE) {
+                if (otherMinSpan > minSpan) {
+                    otherClippedRange = rowKeySchema.clipLeft(pkPos, 
otherRange, minSpan, ptr);
+                } else if (minSpan > otherMinSpan) {
+                    clippedRange = rowKeySchema.clipLeft(pkPos, result, 
otherMinSpan, ptr);
+                }
+            }
+            
+            // intersect result with otherRange
+            result = clippedRange.intersect(otherClippedRange);
+            if (result == KeyRange.EMPTY_RANGE) {
+                return result;
+            }
+            if (minSpan != otherMinSpan) {
+                // If trailing ranges are of different spans, intersect them 
at the common
+                // span and add remaining part of range used to trailing ranges
+                // Without the special case for single key values, the 
trailing ranges
+                // code doesn't work correctly for 
WhereOptimizerTest.testMultiSlotTrailingIntersect()
+                if (result.isSingleKey() && !(range.isSingleKey() && 
otherRange.isSingleKey())) {
+                    int trailingPkPos = pkPos + Math.min(minSpan, 
otherMinSpan);
+                    KeyRange trailingRange = getTrailingRange(rowKeySchema, 
trailingPkPos, minSpan > otherMinSpan ? range : otherRange, result, ptr);
+                    trailingRanges[trailingPkPos] = 
trailingRanges[trailingPkPos].intersect(trailingRange);
                 } else {
-                    keySlot[initPosition] = 
keySlot[initPosition].concatExtractNodes(minMaxExtractNodes);
+                    // Add back clipped part of range 
+                    if (otherMinSpan > minSpan) {
+                        result = concatSuffix(result, otherRange);
+                    } else if (minSpan > otherMinSpan) {
+                        result = concatSuffix(result, range);
+                    }
                 }
             }
-            List<KeySlot> keySlots = Arrays.asList(keySlot);
-            // If we have a salt column, skip that slot because
-            // they'll never be an expression contained by it.
-            keySlots = keySlots.subList(initPosition, keySlots.size());
-            return new MultiKeySlot(keySlots, minMaxRange == 
KeyRange.EVERYTHING_RANGE ? null : minMaxRange, partialExtraction);
+            return result;
+        }
+
+        private static KeyRange concatSuffix(KeyRange result, KeyRange 
otherRange) {
+            byte[] lowerRange = result.getLowerRange();
+            byte[] clippedLowerRange = lowerRange;
+            byte[] fullLowerRange = otherRange.getLowerRange();
+            if (!result.lowerUnbound() && Bytes.startsWith(fullLowerRange, 
clippedLowerRange)) {
+                lowerRange = fullLowerRange;
+            }
+            byte[] upperRange = result.getUpperRange();
+            byte[] clippedUpperRange = upperRange;
+            byte[] fullUpperRange = otherRange.getUpperRange();
+            if (!result.lowerUnbound() && Bytes.startsWith(fullUpperRange, 
clippedUpperRange)) {
+                upperRange = fullUpperRange;
+            }
+            if (lowerRange == clippedLowerRange && upperRange == 
clippedUpperRange) {
+                return result;
+            }
+            return KeyRange.getKeyRange(lowerRange, result.isLowerInclusive(), 
upperRange, result.isUpperInclusive());
+        }
+
+        /**
+         * Intersects an RVC that starts at pkPos with an overlapping range 
that starts at otherPKPos.
+         * For example, ((A, B) - (J, K)) intersected with (F - *) would 
return ((A,F) - (J, K))
+         *     ((A, B) - (J, K)) intersected with (M - P) would return (A-J) 
since both of the trailing
+         * part of the RVC, B and K, do not intersect with B and K.
+         * @param result an RVC expression starting from pkPos and with length 
of at least otherPKPos - pkPos.
+         * @param pkPos the PK position of the leading part of the RVC 
expression
+         * @param otherRange the other range to intersect with the overlapping 
part of the RVC.
+         * @param otherPKPos the PK position of the leading part of the other 
range
+         * @return resulting KeyRange from the intersection, potentially an 
empty range if the result RVC
+         *  is a single key and the trailing part of the key does not 
intersect with the RVC.
+         */
+        private KeyRange intersectTrailing(KeyRange result, int pkPos, 
KeyRange otherRange, int otherPKPos) {
+            RowKeySchema rowKeySchema = table.getRowKeySchema();
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            int separatorLength = 
table.getPKColumns().get(otherPKPos-1).getDataType().isFixedWidth() ? 0 : 1;
+            boolean lowerInclusive = result.isLowerInclusive();
+            byte[] lowerRange = result.getLowerRange();
+            ptr.set(lowerRange);
+            // Position ptr at the point at which the two ranges overlap
+            if (rowKeySchema.position(ptr, pkPos, otherPKPos)) {
+                int lowerOffset = ptr.getOffset();
+                // Increase the length of the ptr to include the entire 
trailing bytes
+                ptr.set(ptr.get(), lowerOffset, lowerRange.length - 
lowerOffset);
+                byte[] trailingBytes = ptr.copyBytes();
+                
+                // Special case for single key since single keys of different 
span lengths
+                // will never overlap. We do not need to process both the 
lower and upper
+                // ranges since they are the same.
+                if (result.isSingleKey() && otherRange.isSingleKey()) {
+                    // Find the span of the trailing bytes as it could be more 
than one.
+                    // We need this to determine if the slot at the last 
position would
+                    // have a separator byte (i.e. is variable length).
+                    int pos = otherPKPos;
+                    rowKeySchema.iterator(trailingBytes, ptr, otherPKPos);
+                    while (rowKeySchema.next(ptr, pos, trailingBytes.length) 
!= null) {
+                        pos++;
+                    }
+                    byte[] otherLowerRange = otherRange.getLowerRange();
+                    boolean isFixedWidthAtEnd = 
table.getPKColumns().get(pos).getDataType().isFixedWidth();
+                    // If the otherRange starts with the overlapping trailing 
byte *and* we're comparing
+                    // the entire key (i.e. not just a leading subset), then 
we have an intersection.
+                    if (Bytes.startsWith(otherLowerRange, trailingBytes) && 
+                            (isFixedWidthAtEnd || 
+                             otherLowerRange.length == trailingBytes.length || 
+                             otherLowerRange[trailingBytes.length] == 
QueryConstants.SEPARATOR_BYTE)) {
+                        return result;
+                    }
+                    // Otherwise, there is no overlap
+                    return KeyRange.EMPTY_RANGE;
+                }
+                // If we're not dealing with single keys, then we can use our 
normal intersection
+                if (otherRange.intersect(KeyRange.getKeyRange(trailingBytes)) 
== KeyRange.EMPTY_RANGE) {
+                    // Exit early since the upper range is the same as the 
lower range
+                    if (result.isSingleKey()) {
+                        return KeyRange.EMPTY_RANGE;
+                    }
+                    ptr.set(result.getLowerRange(), 0, lowerOffset - 
separatorLength);
+                    lowerRange = ptr.copyBytes();
+                }
+            }
+            boolean upperInclusive = result.isUpperInclusive();
+            byte[] upperRange = result.getUpperRange();
+            ptr.set(upperRange);
+            if (rowKeySchema.position(ptr, pkPos, otherPKPos)) {
+                int upperOffset = ptr.getOffset();
+                ptr.set(ptr.get(), upperOffset, upperRange.length - 
upperOffset);
+                if 
(otherRange.intersect(KeyRange.getKeyRange(ptr.copyBytes())) == 
KeyRange.EMPTY_RANGE) {
+                    ptr.set(ptr.get(), 0, upperOffset - separatorLength);
+                    upperRange = ptr.copyBytes();
+                }
+            }
+            if (lowerRange == result.getLowerRange() && upperRange == 
result.getUpperRange()) {
+                return result;
+            }
+            KeyRange range = KeyRange.getKeyRange(lowerRange, lowerInclusive, 
upperRange, upperInclusive);
+            return range;
         }
 
         private KeySlots orKeySlots(OrExpression orExpression, List<KeySlots> 
childSlots) {
@@ -735,7 +1420,6 @@ public class WhereOptimizer {
             // TODO: Have separate list for single span versus multi span
             // For multi-span, we only need to keep a single range.
             List<KeyRange> slotRanges = Lists.newArrayList();
-            KeyRange minMaxRange = KeyRange.EMPTY_RANGE;
             for (KeySlots childSlot : childSlots) {
                 if (childSlot == EMPTY_KEY_SLOTS) {
                     // TODO: can this ever happen and can we safely filter the 
expression tree?
@@ -745,88 +1429,45 @@ public class WhereOptimizer {
                 // if all sub-expressions have been completely extracted. 
Otherwise, we must
                 // leave the OR as a post filter.
                 partialExtraction |= childSlot.isPartialExtraction();
-                if (childSlot.getMinMaxRange() != null) {
-                    if (!slotRanges.isEmpty() && thePosition != initialPos) { 
// ORing together rvc in initial slot with other slots
-                        return null;
+                // TODO: Do the same optimization that we do for IN if the 
childSlots specify a fully qualified row key
+                for (KeySlot slot : childSlot.getSlots()) {
+                    if (slot == null) {
+                        continue;
                     }
-                    minMaxRange = 
minMaxRange.union(childSlot.getMinMaxRange());
-                    thePosition = initialPos;
-                    for (KeySlot slot : childSlot) {
-                       if (slot != null) {
-                               
slotExtractNodes.addAll(slot.getKeyPart().getExtractNodes());
-                       }
-                    }
-                } else {
-                    // TODO: Do the same optimization that we do for IN if the 
childSlots specify a fully qualified row key
-                    for (KeySlot slot : childSlot) {
-                        if (slot == null) {
-                            continue;
-                        }
-                        /*
-                         * If we see a different PK column than before, we 
can't
-                         * optimize it because our SkipScanFilter only handles
-                         * top level expressions that are ANDed together 
(where in
-                         * the same column expressions may be ORed together).
-                         * For example, WHERE a=1 OR b=2 cannot be handled, 
while
-                         *  WHERE (a=1 OR a=2) AND (b=2 OR b=3) can be handled.
-                         * TODO: We could potentially handle these cases 
through
-                         * multiple, nested SkipScanFilters, where each OR 
expression
-                         * is handled by its own SkipScanFilter and the outer 
one
-                         * increments the child ones and picks the one with 
the smallest
-                         * key.
-                         */
-                        if (thePosition == -1) {
-                            theSlot = slot;
-                            thePosition = slot.getPKPosition();
-                        } else if (thePosition != slot.getPKPosition()) {
-                            return null;
-                        }
-                        
slotExtractNodes.addAll(slot.getKeyPart().getExtractNodes());
-                        slotRanges.addAll(slot.getKeyRanges());
+                    /*
+                     * If we see a different PK column than before, we can't
+                     * optimize it because our SkipScanFilter only handles
+                     * top level expressions that are ANDed together (where in
+                     * the same column expressions may be ORed together).
+                     * For example, WHERE a=1 OR b=2 cannot be handled, while
+                     *  WHERE (a=1 OR a=2) AND (b=2 OR b=3) can be handled.
+                     * TODO: We could potentially handle these cases through
+                     * multiple, nested SkipScanFilters, where each OR 
expression
+                     * is handled by its own SkipScanFilter and the outer one
+                     * increments the child ones and picks the one with the 
smallest
+                     * key.
+                     */
+                    if (thePosition == -1) {
+                        theSlot = slot;
+                        thePosition = slot.getPKPosition();
+                    } else if (thePosition != slot.getPKPosition()) {
+                        return null;
                     }
+                    
slotExtractNodes.addAll(slot.getKeyPart().getExtractNodes());
+                    slotRanges.addAll(slot.getKeyRanges());
                 }
             }
 
             if (thePosition == -1) {
                 return null;
             }
-            // With a mix of both, we can't use skip scan, so empty out the 
union
-            // and only extract the min/max nodes.
-            if (!slotRanges.isEmpty() && minMaxRange != KeyRange.EMPTY_RANGE) {
-                boolean clearExtracts = false;
-                // Union the minMaxRanges together with the slotRanges.
-                for (KeyRange range : slotRanges) {
-                    if (!clearExtracts) {
-                        /*
-                         * Detect when to clear the extract nodes by 
determining if there
-                         * are gaps left by combining the ranges. If there are 
gaps, we
-                         * cannot extract the nodes, but must them as filters 
instead.
-                         */
-                        KeyRange intersection = minMaxRange.intersect(range);
-                        if (intersection == KeyRange.EMPTY_RANGE 
-                                || !range.equals(intersection.union(range)) 
-                                || 
!minMaxRange.equals(intersection.union(minMaxRange))) {
-                            clearExtracts = true;
-                        }
-                    }
-                    minMaxRange = minMaxRange.union(range);
-                }
-                if (clearExtracts) {
-                    partialExtraction = true;
-                    slotExtractNodes = Collections.emptyList();
-                }
-                slotRanges = Collections.emptyList();
-            }
             if (theSlot == null) {
                 theSlot = new KeySlot(new BaseKeyPart(table, 
table.getPKColumns().get(initialPos), slotExtractNodes), initialPos, 1, 
EVERYTHING_RANGES, null);
-            } else if (minMaxRange != KeyRange.EMPTY_RANGE && 
!slotExtractNodes.isEmpty()) {
-                theSlot = theSlot.concatExtractNodes(slotExtractNodes);
             }
             return newKeyParts(
                     theSlot, 
                     partialExtraction ? slotExtractNodes : 
Collections.<Expression>singletonList(orExpression), 
-                    slotRanges.isEmpty() ? EVERYTHING_RANGES : 
KeyRange.coalesce(slotRanges), 
-                    minMaxRange == KeyRange.EMPTY_RANGE ? null : minMaxRange);
+                    slotRanges.isEmpty() ? EVERYTHING_RANGES : 
KeyRange.coalesce(slotRanges));
         }
 
         private final PTable table;
@@ -847,7 +1488,7 @@ public class WhereOptimizer {
             if (childParts.isEmpty()) {
                 return null;
             }
-            return newCoerceKeyPart(childParts.get(0).iterator().next(), node);
+            return newCoerceKeyPart(childParts.get(0).getSlots().get(0), node);
         }
 
         @Override
@@ -910,10 +1551,11 @@ public class WhereOptimizer {
             }
             Expression rhs = node.getChildren().get(1);
             KeySlots childSlots = childParts.get(0);
-            KeySlot childSlot = childSlots.iterator().next();
+            KeySlot childSlot = childSlots.getSlots().get(0);
             KeyPart childPart = childSlot.getKeyPart();
-            SortOrder sortOrder = childPart.getColumn().getSortOrder();
-            CompareOp op = sortOrder.transform(node.getFilterOp());
+            //SortOrder sortOrder = childPart.getColumn().getSortOrder();
+            CompareOp op = node.getFilterOp();
+            //CompareOp op = sortOrder.transform(node.getFilterOp());
             KeyRange keyRange = childPart.getKeyRange(op, rhs);
             return newKeyParts(childSlot, node, keyRange);
         }
@@ -935,7 +1577,7 @@ public class WhereOptimizer {
             if (childParts.isEmpty()) {
                 return null;
             }
-            return 
newScalarFunctionKeyPart(childParts.get(0).iterator().next(), node);
+            return 
newScalarFunctionKeyPart(childParts.get(0).getSlots().get(0), node);
         }
 
         @Override
@@ -957,7 +1599,7 @@ public class WhereOptimizer {
             }
             // for SUBSTR(<column>,1,3) LIKE 'foo%'
             KeySlots childSlots = childParts.get(0);
-            KeySlot childSlot = childSlots.iterator().next();
+            KeySlot childSlot = childSlots.getSlots().get(0);
             final String startsWith = node.getLiteralPrefix();
             SortOrder sortOrder = node.getChildren().get(0).getSortOrder();
             byte[] key = PVarchar.INSTANCE.toBytes(startsWith, sortOrder);
@@ -990,9 +1632,12 @@ public class WhereOptimizer {
                 lowerRange = Arrays.copyOf(lowerRange, lowerRange.length+1);
                 lowerRange[lowerRange.length-1] = 
QueryConstants.SEPARATOR_BYTE;
             }
-            KeyRange keyRange = type.getKeyRange(lowerRange, true, upperRange, 
false);
+            KeyRange range = type.getKeyRange(lowerRange, true, upperRange, 
false);
+            if (column.getSortOrder() == SortOrder.DESC) {
+                range = range.invert();
+            }
             // Only extract LIKE expression if pattern ends with a wildcard 
and everything else was extracted
-            return newKeyParts(childSlot, node.endsWithOnlyWildcard() ? node : 
null, keyRange);
+            return newKeyParts(childSlot, node.endsWithOnlyWildcard() ? node : 
null, range);
         }
 
         @Override
@@ -1008,7 +1653,7 @@ public class WhereOptimizer {
 
             List<Expression> keyExpressions = node.getKeyExpressions();
             Set<KeyRange> ranges = 
Sets.newHashSetWithExpectedSize(keyExpressions.size());
-            KeySlot childSlot = childParts.get(0).iterator().next();
+            KeySlot childSlot = childParts.get(0).getSlots().get(0);
             KeyPart childPart = childSlot.getKeyPart();
             // Handles cases like WHERE substr(foo,1,3) IN ('aaa','bbb')
             for (Expression key : keyExpressions) {
@@ -1020,7 +1665,7 @@ public class WhereOptimizer {
                     ranges.add(range);
                 }
             }
-            return newKeyParts(childSlot, node, new 
ArrayList<KeyRange>(ranges), null);
+            return newKeyParts(childSlot, node, new 
ArrayList<KeyRange>(ranges));
         }
 
         @Override
@@ -1034,10 +1679,12 @@ public class WhereOptimizer {
                 return null;
             }
             KeySlots childSlots = childParts.get(0);
-            KeySlot childSlot = childSlots.iterator().next();
+            KeySlot childSlot = childSlots.getSlots().get(0);
             PColumn column = childSlot.getKeyPart().getColumn();
             PDataType type = column.getDataType();
             boolean isFixedWidth = type.isFixedWidth();
+            // Nothing changes for IS NULL and IS NOT NULL when DESC since
+            // we represent NULL the same way for ASC and DESC
             if (isFixedWidth) { // if column can't be null
                 return node.isNegate() ? null : 
                     newKeyParts(childSlot, node, type.getKeyRange(new 
byte[SchemaUtil.getFixedByteSize(column)], true,
@@ -1048,9 +1695,25 @@ public class WhereOptimizer {
             }
         }
 
-        private static interface KeySlots extends Iterable<KeySlot> {
-            @Override public Iterator<KeySlot> iterator();
-            public KeyRange getMinMaxRange();
+        /**
+         * 
+         * Top level data structure used to drive the formation
+         * of the start/stop row of scans, essentially taking the
+         * expression tree of a WHERE clause and producing the
+         * ScanRanges instance during query compilation.
+         *
+         */
+        public static interface KeySlots {
+            
+            /**
+             * List of slots that store binding of constant values
+             * for primary key columns. For example:
+             * WHERE pk1 = 'foo' and pk2 = 'bar'
+             * would produce two KeySlot instances that store that
+             * pk1 = 'foo' and pk2 = 'bar'. 
+             * @return
+             */
+            public List<KeySlot> getSlots();
             /**
              * Tracks whether or not the contained KeySlot(s) contain
              * a slot that includes only a partial extraction of the
@@ -1063,19 +1726,25 @@ public class WhereOptimizer {
              */
             public boolean isPartialExtraction();
         }
-
-        private final class KeySlot {
-            private final int pkPosition;
-            private final int pkSpan;
-            private final KeyPart keyPart;
+        
+        /**
+         * 
+         * Used during query compilation to represent the constant value of a
+         * primary key column based on expressions in the WHERE clause. These
+         * are combined together during the compilation of ANDs and ORs to
+         * to produce the start and stop scan range.
+         *
+         */
+        static final class KeySlot {
+            private final int pkPosition; // Position in primary key
+            private final int pkSpan; // Will be > 1 for RVC
+            private final KeyPart keyPart; // Used to produce the KeyRanges 
below
+            // Multiple ranges means values that have been ORed together
             private final List<KeyRange> keyRanges;
-            private final OrderPreserving orderPreserving;
-
-            private KeySlot(KeyPart keyPart, int pkPosition, int pkSpan, 
List<KeyRange> keyRanges) {
-                this (keyPart, pkPosition, pkSpan, keyRanges, 
OrderPreserving.YES);
-            }
+            // If order rows returned from scan will match desired order 
declared in query
+            private final OrderPreserving orderPreserving; 
             
-            private KeySlot(KeyPart keyPart, int pkPosition, int pkSpan, 
List<KeyRange> keyRanges, OrderPreserving orderPreserving) {
+            KeySlot(KeyPart keyPart, int pkPosition, int pkSpan, 
List<KeyRange> keyRanges, OrderPreserving orderPreserving) {
                 this.pkPosition = pkPosition;
                 this.pkSpan = pkSpan;
                 this.keyPart = keyPart;
@@ -1109,100 +1778,29 @@ public class WhereOptimizer {
                         this.getOrderPreserving());
             }
             
-            public final KeySlot intersect(KeySlot that) {
-                if (this.getPKSpan() == 1 && that.getPKSpan() == 1) {
-                    if (this.getPKPosition() != that.getPKPosition()) {
-                        throw new IllegalArgumentException("Position must be 
equal for intersect");
-                    }
-                    List<KeyRange> keyRanges = 
KeyRange.intersect(this.getKeyRanges(), that.getKeyRanges());
-                    if (isDegenerate(keyRanges)) {
-                        return null;
-                    }
-                    return new KeySlot(
-                            new BaseKeyPart(this.getKeyPart().getTable(), 
this.getKeyPart().getColumn(),
-                                        
SchemaUtil.concat(this.getKeyPart().getExtractNodes(),
-                                                          
that.getKeyPart().getExtractNodes())),
-                            this.getPKPosition(),
-                            this.getPKSpan(),
-                            keyRanges,
-                            this.getOrderPreserving());
-                } else {
-                    // Assumes that only single keys occur in RVCs (i.e. when 
a PK spans columns)
-                    assert(this.getPKSpan() > 1);
-                    assert(this.getPKPosition() <= that.getPKPosition());
-                    ImmutableBytesWritable ptr = context.getTempPtr();
-                    RowKeySchema schema = table.getRowKeySchema();
-                    if (this.getPKSpan() > 1 && that.getPKSpan() > 1) {
-                        // TODO: Trickiest case: both key slots are multi-span 
RVCs.
-                        // Punt for now: we could intersect these, but it'd be 
a fair amount of code.
-                        // Instead, just keep the original slot and don't 
extract
-                        // the other expressions (so they'll be evaluated row 
by row).
-                        return this;
-                    } else {
-                        assert(this.getPKSpan() > 1);
-                        assert(this.getPKPosition() <= that.getPKPosition());
-                        List<KeyRange> newKeyRanges = 
Lists.newArrayListWithExpectedSize(this.getKeyRanges().size());
-                        // We know we have a set of RVCs (i.e. multi-span key 
ranges)
-                        // Get the PK slot value in the RVC for the position 
of the other KeySlot
-                        // If they don't intersect, we cannot have a match for 
the RVC, so filter it out.
-                        // Otherwise, we keep it.
-                        for (KeyRange keyRange : this.getKeyRanges()) {
-                            if (keyRange == KeyRange.EVERYTHING_RANGE) {
-                                return this;
-                            }
-                            assert(keyRange.isSingleKey());
-                            byte[] key = keyRange.getLowerRange();
-                            int position = this.getPKPosition();
-                            int thatPosition = that.getPKPosition();
-                            ptr.set(key);
-                            if (schema.position(ptr, position, thatPosition)) {
-                                // Create a range just for the overlapping 
column
-                                List<KeyRange> slotKeyRanges = 
Collections.singletonList(KeyRange.getKeyRange(ByteUtil.copyKeyBytesIfNecessary(ptr)));
-                                // Intersect with other ranges and add to list 
if it overlaps
-                                if 
(!isDegenerate(KeyRange.intersect(slotKeyRanges, that.getKeyRanges()))) {
-                                    newKeyRanges.add(keyRange);
-                                }
-                            }
-                        }
-                        if (isDegenerate(newKeyRanges)) {
-                            return null;
-                        }
-                        return new KeySlot(
-                                new BaseKeyPart(this.getKeyPart().getTable(), 
this.getKeyPart().getColumn(),
-                                            
SchemaUtil.concat(this.getKeyPart().getExtractNodes(),
-                                                              
that.getKeyPart().getExtractNodes())),
-                                this.getPKPosition(),
-                                this.getPKSpan(),
-                                newKeyRanges,
-                                this.getOrderPreserving());
-                    }
-                }
-            }
-
             public OrderPreserving getOrderPreserving() {
                 return orderPreserving;
             }
         }
 
-        private static class MultiKeySlot implements KeySlots {
+        /**
+         * 
+         * Implementation of KeySlots for AND and OR expressions. The
+         * List<KeySlot> will be in PK order.
+         *
+         */
+        public static class MultiKeySlot implements KeySlots {
             private final List<KeySlot> childSlots;
-            private final KeyRange minMaxRange;
             private final boolean partialExtraction;
 
-            private MultiKeySlot(List<KeySlot> childSlots, KeyRange 
minMaxRange, boolean partialExtraction) {
+            private MultiKeySlot(List<KeySlot> childSlots, boolean 
partialExtraction) {
                 this.childSlots = childSlots;
-                this.minMaxRange = minMaxRange;
                 this.partialExtraction = partialExtraction;
             }
 
             @Override
-            public Iterator<KeySlot> iterator() {
-                return childSlots.iterator();
-            }
-
-            @Override
-            public KeyRange getMinMaxRange() {
-                return minMaxRange;
+            public List<KeySlot> getSlots() {
+                return childSlots;
             }
 
             @Override
@@ -1211,11 +1809,15 @@ public class WhereOptimizer {
             }
         }
 
-        private class SingleKeySlot implements KeySlots {
-            private final KeySlot slot;
-            private final KeyRange minMaxRange;
+        /**
+         * 
+         * Implementation of KeySlots for a constant value, 
+         *
+         */
+        public static class SingleKeySlot implements KeySlots {
+            private final List<KeySlot> slots;
             
-            private SingleKeySlot(KeyPart part, int pkPosition, List<KeyRange> 
ranges) {
+            SingleKeySlot(KeyPart part, int pkPosition, List<KeyRange> ranges) 
{
                 this(part, pkPosition, 1, ranges);
             }
             
@@ -1224,36 +1826,26 @@ public class WhereOptimizer {
             }
             
             private SingleKeySlot(KeyPart part, int pkPosition, int pkSpan, 
List<KeyRange> ranges) {
-                this(part,pkPosition,pkSpan,ranges, null, null);
+                this(part,pkPosition,pkSpan,ranges, null);
             }
             
             private SingleKeySlot(KeyPart part, int pkPosition, int pkSpan, 
List<KeyRange> ranges, OrderPreserving orderPreserving) {
-                this(part,pkPosition,pkSpan,ranges, null, orderPreserving);
-            }
-            
-            private SingleKeySlot(KeyPart part, int pkPosition, int pkSpan, 
List<KeyRange> ranges, KeyRange minMaxRange, OrderPreserving orderPreserving) {
-                this.slot = new KeySlot(part, pkPosition, pkSpan, ranges, 
orderPreserving);
-                this.minMaxRange = minMaxRange;
+                this.slots = Collections.singletonList(new KeySlot(part, 
pkPosition, pkSpan, ranges, orderPreserving));
             }
             
             @Override
-            public Iterator<KeySlot> iterator() {
-                return Iterators.<KeySlot>singletonIterator(slot);
-            }
-
-            @Override
-            public KeyRange getMinMaxRange() {
-                return minMaxRange;
+            public List<KeySlot> getSlots() {
+                return slots;
             }
 
             @Override
             public boolean isPartialExtraction() {
-                return this.slot.getKeyPart().getExtractNodes().isEmpty();
+                return 
this.slots.get(0).getKeyPart().getExtractNodes().isEmpty();
             }
             
         }
         
-        private static class BaseKeyPart implements KeyPart {
+        public static class BaseKeyPart implements KeyPart {
             @Override
             public KeyRange getKeyRange(CompareOp op, Expression rhs) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -1264,11 +1856,17 @@ public class WhereOptimizer {
                     Integer length = getColumn().getMaxLength();
    

<TRUNCATED>

Reply via email to