http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
new file mode 100644
index 0000000..99140ef
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.SortedSetMultimap;
+import com.google.common.collect.TreeMultimap;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Represents which (non-PK) columns (and optionally which sub-part of a 
column for complex columns) are selected
+ * by a query.
+ *
+ * In practice, this class cover 2 main cases:
+ *   1) most user queries have to internally query all columns, because the 
CQL semantic requires us to know if
+ *      a row is live or not even if it has no values for the columns 
requested by the user (see #6588for more
+ *      details). However, while we need to know for columns if it has live 
values, we can actually save from
+ *      sending the values for those columns that will not be returned to the 
user.
+ *   2) for some internal queries (and for queries using #6588 if we introduce 
it), we're actually fine only
+ *      actually querying some of the columns.
+ *
+ * For complex columns, this class allows to be more fine grained than the 
column by only selection some of the
+ * cells of the complex column (either individual cell by path name, or some 
slice).
+ */
+public class ColumnFilter
+{
+    public static final Serializer serializer = new Serializer();
+
+    private static final Comparator<ColumnIdentifier> keyComparator = new 
Comparator<ColumnIdentifier>()
+    {
+        public int compare(ColumnIdentifier id1, ColumnIdentifier id2)
+        {
+            return ByteBufferUtil.compareUnsigned(id1.bytes, id2.bytes);
+        }
+    };
+    private static final Comparator<ColumnSubselection> valueComparator = new 
Comparator<ColumnSubselection>()
+    {
+        public int compare(ColumnSubselection s1, ColumnSubselection s2)
+        {
+            assert s1.column().name.equals(s2.column().name);
+            return 
s1.column().cellPathComparator().compare(s1.minIncludedPath(), 
s2.minIncludedPath());
+        }
+    };
+
+    // Distinguish between the 2 cases described above: if 'isFetchAll' is 
true, then all columns will be retrieved
+    // by the query, but the values for column/cells not selected by 
'selection' and 'subSelections' will be skipped.
+    // Otherwise, only the column/cells returned by 'selection' and 
'subSelections' will be returned at all.
+    private final boolean isFetchAll;
+
+    private final CFMetaData metadata; // can be null if !isFetchAll
+
+    private final PartitionColumns selection; // can be null if isFetchAll and 
we don't want to skip any value
+    private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> 
subSelections; // can be null
+
+    private ColumnFilter(boolean isFetchAll,
+                         CFMetaData metadata,
+                         PartitionColumns columns,
+                         SortedSetMultimap<ColumnIdentifier, 
ColumnSubselection> subSelections)
+    {
+        this.isFetchAll = isFetchAll;
+        this.metadata = metadata;
+        this.selection = columns;
+        this.subSelections = subSelections;
+    }
+
+    /**
+     * A selection that includes all columns (and their values).
+     */
+    public static ColumnFilter all(CFMetaData metadata)
+    {
+        return new ColumnFilter(true, metadata, null, null);
+    }
+
+    /**
+     * A selection that only fetch the provided columns.
+     * <p>
+     * Note that this shouldn't be used for CQL queries in general as all 
columns should be queried to
+     * preserve CQL semantic (see class javadoc). This is ok for some internal 
queries however (and
+     * for #6588 if/when we implement it).
+     */
+    public static ColumnFilter selection(PartitionColumns columns)
+    {
+        return new ColumnFilter(false, null, columns, null);
+    }
+
+    /**
+     * The columns that needs to be fetched internally for this selection.
+     * <p>
+     * This is the columns that must be present in the internal rows returned 
by queries using this selection,
+     * not the columns that are actually queried by the user (see the class 
javadoc for details).
+     *
+     * @return the column to fetch for this selection.
+     */
+    public PartitionColumns fetchedColumns()
+    {
+        return isFetchAll ? metadata.partitionColumns() : selection;
+    }
+
+    /**
+     * Whether the provided column is selected by this selection.
+     */
+    public boolean includes(ColumnDefinition column)
+    {
+        return isFetchAll || selection.contains(column);
+    }
+
+    /**
+     * Whether we can skip the value for the provided selected column.
+     */
+    public boolean canSkipValue(ColumnDefinition column)
+    {
+        return isFetchAll && selection != null && !selection.contains(column);
+    }
+
+    /**
+     * Whether the provided cell of a complex column is selected by this 
selection.
+     */
+    public boolean includes(Cell cell)
+    {
+        if (isFetchAll || subSelections == null || !cell.column().isComplex())
+            return true;
+
+        SortedSet<ColumnSubselection> s = 
subSelections.get(cell.column().name);
+        if (s.isEmpty())
+            return true;
+
+        for (ColumnSubselection subSel : s)
+            if (subSel.includes(cell.path()))
+                return true;
+
+        return false;
+    }
+
+    /**
+     * Whether we can skip the value of the cell of a complex column.
+     */
+    public boolean canSkipValue(ColumnDefinition column, CellPath path)
+    {
+        if (!isFetchAll || subSelections == null || !column.isComplex())
+            return false;
+
+        SortedSet<ColumnSubselection> s = subSelections.get(column.name);
+        if (s.isEmpty())
+            return false;
+
+        for (ColumnSubselection subSel : s)
+            if (subSel.includes(path))
+                return false;
+
+        return true;
+    }
+
+    /**
+     * Creates a new {@code Tester} to efficiently test the inclusion of cells 
of complex column
+     * {@code column}.
+     */
+    public Tester newTester(ColumnDefinition column)
+    {
+        if (subSelections == null || !column.isComplex())
+            return null;
+
+        SortedSet<ColumnSubselection> s = subSelections.get(column.name);
+        if (s.isEmpty())
+            return null;
+
+        return new Tester(s.iterator());
+    }
+
+    /**
+     * Returns a {@code ColumnFilter}} builder that includes all columns (so 
the selections
+     * added to the builder are the columns/cells for which we shouldn't skip 
the values).
+     */
+    public static Builder allColumnsBuilder(CFMetaData metadata)
+    {
+        return new Builder(metadata);
+    }
+
+    /**
+     * Returns a {@code ColumnFilter}} builder that includes only the 
columns/cells
+     * added to the builder.
+     */
+    public static Builder selectionBuilder()
+    {
+        return new Builder(null);
+    }
+
+    public static class Tester
+    {
+        private ColumnSubselection current;
+        private final Iterator<ColumnSubselection> iterator;
+
+        private Tester(Iterator<ColumnSubselection> iterator)
+        {
+            this.iterator = iterator;
+        }
+
+        public boolean includes(CellPath path)
+        {
+            while (current == null)
+            {
+                if (!iterator.hasNext())
+                    return false;
+
+                current = iterator.next();
+                if (current.includes(path))
+                    return true;
+
+                if 
(current.column().cellPathComparator().compare(current.maxIncludedPath(), path) 
< 0)
+                    current = null;
+            }
+            return false;
+        }
+
+        public boolean canSkipValue(CellPath path)
+        {
+            while (current == null)
+            {
+                if (!iterator.hasNext())
+                    return false;
+
+                current = iterator.next();
+                if (current.includes(path))
+                    return false;
+
+                if 
(current.column().cellPathComparator().compare(current.maxIncludedPath(), path) 
< 0)
+                    current = null;
+            }
+            return true;
+        }
+    }
+
+    public static class Builder
+    {
+        private final CFMetaData metadata;
+        private PartitionColumns.Builder selection;
+        private List<ColumnSubselection> subSelections;
+
+        public Builder(CFMetaData metadata)
+        {
+            this.metadata = metadata;
+        }
+
+        public Builder add(ColumnDefinition c)
+        {
+            if (selection == null)
+                selection = PartitionColumns.builder();
+            selection.add(c);
+            return this;
+        }
+
+        public Builder addAll(Iterable<ColumnDefinition> columns)
+        {
+            if (selection == null)
+                selection = PartitionColumns.builder();
+            selection.addAll(columns);
+            return this;
+        }
+
+        private Builder addSubSelection(ColumnSubselection subSelection)
+        {
+            add(subSelection.column());
+            if (subSelections == null)
+                subSelections = new ArrayList<>();
+            subSelections.add(subSelection);
+            return this;
+        }
+
+        public Builder slice(ColumnDefinition c, CellPath from, CellPath to)
+        {
+            return addSubSelection(ColumnSubselection.slice(c, from, to));
+        }
+
+        public Builder select(ColumnDefinition c, CellPath elt)
+        {
+            return addSubSelection(ColumnSubselection.element(c, elt));
+        }
+
+        public ColumnFilter build()
+        {
+            boolean isFetchAll = metadata != null;
+            assert isFetchAll || selection != null;
+
+            SortedSetMultimap<ColumnIdentifier, ColumnSubselection> s = null;
+            if (subSelections != null)
+            {
+                s = TreeMultimap.create(keyComparator, valueComparator);
+                for (ColumnSubselection subSelection : subSelections)
+                    s.put(subSelection.column().name, subSelection);
+            }
+
+            return new ColumnFilter(isFetchAll, metadata, selection == null ? 
null : selection.build(), s);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        if (selection == null)
+            return "*";
+
+        Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
+        StringBuilder sb = new StringBuilder();
+        appendColumnDef(sb, defs.next());
+        while (defs.hasNext())
+            appendColumnDef(sb.append(", "), defs.next());
+        return sb.toString();
+    }
+
+    private void appendColumnDef(StringBuilder sb, ColumnDefinition column)
+    {
+        if (subSelections == null)
+        {
+            sb.append(column.name);
+            return;
+        }
+
+        SortedSet<ColumnSubselection> s = subSelections.get(column.name);
+        if (s.isEmpty())
+        {
+            sb.append(column.name);
+            return;
+        }
+
+        int i = 0;
+        for (ColumnSubselection subSel : s)
+            sb.append(i++ == 0 ? "" : ", ").append(column.name).append(subSel);
+    }
+
+    public static class Serializer
+    {
+        private static final int IS_FETCH_ALL_MASK       = 0x01;
+        private static final int HAS_SELECTION_MASK      = 0x02;
+        private static final int HAS_SUB_SELECTIONS_MASK = 0x04;
+
+        private int makeHeaderByte(ColumnFilter selection)
+        {
+            return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
+                 | (selection.selection != null ? HAS_SELECTION_MASK : 0)
+                 | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK 
: 0);
+        }
+
+        public void serialize(ColumnFilter selection, DataOutputPlus out, int 
version) throws IOException
+        {
+            out.writeByte(makeHeaderByte(selection));
+
+            if (selection.selection != null)
+            {
+                Columns.serializer.serialize(selection.selection.statics, out);
+                Columns.serializer.serialize(selection.selection.regulars, 
out);
+            }
+
+            if (selection.subSelections != null)
+            {
+                out.writeShort(selection.subSelections.size());
+                for (ColumnSubselection subSel : 
selection.subSelections.values())
+                    ColumnSubselection.serializer.serialize(subSel, out, 
version);
+            }
+        }
+
+        public ColumnFilter deserialize(DataInput in, int version, CFMetaData 
metadata) throws IOException
+        {
+            int header = in.readUnsignedByte();
+            boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0;
+            boolean hasSelection = (header & HAS_SELECTION_MASK) != 0;
+            boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
+
+            PartitionColumns selection = null;
+            if (hasSelection)
+            {
+                Columns statics = Columns.serializer.deserialize(in, metadata);
+                Columns regulars = Columns.serializer.deserialize(in, 
metadata);
+                selection = new PartitionColumns(statics, regulars);
+            }
+
+            SortedSetMultimap<ColumnIdentifier, ColumnSubselection> 
subSelections = null;
+            if (hasSubSelections)
+            {
+                subSelections = TreeMultimap.create(keyComparator, 
valueComparator);
+                int size = in.readUnsignedShort();
+                for (int i = 0; i < size; i++)
+                {
+                    ColumnSubselection subSel = 
ColumnSubselection.serializer.deserialize(in, version, metadata);
+                    subSelections.put(subSel.column().name, subSel);
+                }
+            }
+
+            return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, 
selection, subSelections);
+        }
+
+        public long serializedSize(ColumnFilter selection, int version, 
TypeSizes sizes)
+        {
+            long size = 1; // header byte
+
+            if (selection.selection != null)
+            {
+                size += 
Columns.serializer.serializedSize(selection.selection.statics, sizes);
+                size += 
Columns.serializer.serializedSize(selection.selection.regulars, sizes);
+            }
+
+            if (selection.subSelections != null)
+            {
+
+                size += sizes.sizeof((short)selection.subSelections.size());
+                for (ColumnSubselection subSel : 
selection.subSelections.values())
+                    size += 
ColumnSubselection.serializer.serializedSize(subSel, version, sizes);
+            }
+
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java 
b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
deleted file mode 100644
index 1cc348c..0000000
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class ColumnSlice
-{
-    public static final ColumnSlice ALL_COLUMNS = new 
ColumnSlice(Composites.EMPTY, Composites.EMPTY);
-    public static final ColumnSlice[] ALL_COLUMNS_ARRAY = new ColumnSlice[]{ 
ALL_COLUMNS };
-
-    public final Composite start;
-    public final Composite finish;
-
-    public ColumnSlice(Composite start, Composite finish)
-    {
-        assert start != null && finish != null;
-        this.start = start;
-        this.finish = finish;
-    }
-
-    public boolean isAlwaysEmpty(CellNameType comparator, boolean reversed)
-    {
-        Comparator<Composite> orderedComparator = reversed ? 
comparator.reverseComparator() : comparator;
-        return !start.isEmpty() && !finish.isEmpty() && 
orderedComparator.compare(start, finish) > 0;
-    }
-
-    public boolean includes(Comparator<Composite> cmp, Composite name)
-    {
-        return (start.isEmpty() || cmp.compare(start, name) <= 0) && 
(finish.isEmpty() || cmp.compare(finish, name) >= 0);
-    }
-
-    public boolean isBefore(Comparator<Composite> cmp, Composite name)
-    {
-        return !finish.isEmpty() && cmp.compare(finish, name) < 0;
-    }
-
-    public boolean intersects(List<ByteBuffer> minCellNames, List<ByteBuffer> 
maxCellNames, CellNameType comparator, boolean reversed)
-    {
-        Composite sStart = reversed ? finish : start;
-        Composite sEnd = reversed ? start : finish;
-
-        if (compare(sStart, maxCellNames, comparator, true) > 0 || 
compare(sEnd, minCellNames, comparator, false) < 0)
-            return false;
-
-        // We could safely return true here, but there's a minor optimization: 
if the first component is restricted
-        // to a single value, we can check that the second component falls 
within the min/max for that component
-        // (and repeat for all components).
-        for (int i = 0; i < minCellNames.size() && i < maxCellNames.size(); 
i++)
-        {
-            AbstractType<?> t = comparator.subtype(i);
-            ByteBuffer s = i < sStart.size() ? sStart.get(i) : 
ByteBufferUtil.EMPTY_BYTE_BUFFER;
-            ByteBuffer f = i < sEnd.size() ? sEnd.get(i) : 
ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-            // we already know the first component falls within its min/max 
range (otherwise we wouldn't get here)
-            if (i > 0 && (i < sEnd.size() && t.compare(f, minCellNames.get(i)) 
< 0 ||
-                          i < sStart.size() && t.compare(s, 
maxCellNames.get(i)) > 0))
-                return false;
-
-            // if this component isn't equal in the start and finish, we don't 
need to check any more
-            if (i >= sStart.size() || i >= sEnd.size() || t.compare(s, f) != 0)
-                break;
-        }
-
-        return true;
-    }
-
-    /** Helper method for intersects() */
-    private int compare(Composite sliceBounds, List<ByteBuffer> sstableBounds, 
CellNameType comparator, boolean isSliceStart)
-    {
-        for (int i = 0; i < sstableBounds.size(); i++)
-        {
-            if (i >= sliceBounds.size())
-            {
-                // When isSliceStart is true, we're comparing the end of the 
slice against the min cell name for the sstable,
-                // so the slice is something like [(1, 0), (1, 0)], and the 
sstable max is something like (1, 0, 1).
-                // We want to return -1 (slice start is smaller than max 
column name) so that we say the slice intersects.
-                // The opposite is true when dealing with the end slice.  For 
example, with the same slice and a min
-                // cell name of (1, 0, 1), we want to return 1 (slice end is 
bigger than min column name).
-                return isSliceStart ? -1 : 1;
-            }
-
-            int comparison = comparator.subtype(i).compare(sliceBounds.get(i), 
sstableBounds.get(i));
-            if (comparison != 0)
-                return comparison;
-        }
-
-        // the slice bound and sstable bound have been equal in all components 
so far
-        if (sliceBounds.size() > sstableBounds.size())
-        {
-            // We have the opposite situation from the one described above.  
With a slice of [(1, 0), (1, 0)],
-            // and a min/max cell name of (1), we want to say the slice start 
is smaller than the max and the slice
-            // end is larger than the min.
-            return isSliceStart ? -1 : 1;
-        }
-
-        return 0;
-    }
-
-    /**
-     * Validates that the provided slice array contains only non-overlapped 
slices valid for a query {@code reversed}
-     * or not on a table using {@code comparator}.
-     */
-    public static boolean validateSlices(ColumnSlice[] slices, CellNameType 
type, boolean reversed)
-    {
-        Comparator<Composite> comparator = reversed ? type.reverseComparator() 
: type;
-
-        for (int i = 0; i < slices.length; i++)
-        {
-            Composite start = slices[i].start;
-            Composite finish = slices[i].finish;
-
-            if (start.isEmpty() || finish.isEmpty())
-            {
-                if (start.isEmpty() && i > 0)
-                    return false;
-
-                if (finish.isEmpty())
-                    return i == slices.length - 1;
-            }
-            else
-            {
-                // !finish.isEmpty() is imposed by prior loop
-                if (i > 0 && comparator.compare(slices[i - 1].finish, start) 
>= 0)
-                    return false;
-
-                if (comparator.compare(start, finish) > 0)
-                    return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Takes an array of slices (potentially overlapping and in any order, 
though each individual slice must have
-     * its start before or equal its end in {@code comparator} orde) and 
return an equivalent array of non-overlapping
-     * slices in {@code comparator order}.
-     *
-     * @param slices an array of slices. This may be modified by this method.
-     * @param comparator the order in which to sort the slices.
-     * @return the smallest possible array of non-overlapping slices in {@code 
compator} order. If the original
-     * slices are already non-overlapping and in comparator order, this may or 
may not return the provided slices
-     * directly.
-     */
-    public static ColumnSlice[] deoverlapSlices(ColumnSlice[] slices, final 
Comparator<Composite> comparator)
-    {
-        if (slices.length <= 1)
-            return slices;
-
-        Arrays.sort(slices, new Comparator<ColumnSlice>()
-        {
-            @Override
-            public int compare(ColumnSlice s1, ColumnSlice s2)
-            {
-                if (s1.start.isEmpty() || s2.start.isEmpty())
-                {
-                    if (s1.start.isEmpty() != s2.start.isEmpty())
-                        return s1.start.isEmpty() ? -1 : 1;
-                }
-                else
-                {
-                    int c = comparator.compare(s1.start, s2.start);
-                    if (c != 0)
-                        return c;
-                }
-
-                // For the finish, empty always means greater
-                return s1.finish.isEmpty() || s2.finish.isEmpty()
-                     ? (s1.finish.isEmpty() ? 1 : -1)
-                     : comparator.compare(s1.finish, s2.finish);
-            }
-        });
-
-        List<ColumnSlice> slicesCopy = new ArrayList<>(slices.length);
-
-        ColumnSlice last = slices[0];
-
-        for (int i = 1; i < slices.length; i++)
-        {
-            ColumnSlice s2 = slices[i];
-
-            boolean includesStart = last.includes(comparator, s2.start);
-            boolean includesFinish = s2.finish.isEmpty() ? 
last.finish.isEmpty() : last.includes(comparator, s2.finish);
-
-            if (includesStart && includesFinish)
-                continue;
-
-            if (!includesStart && !includesFinish)
-            {
-                slicesCopy.add(last);
-                last = s2;
-                continue;
-            }
-
-            if (includesStart)
-            {
-                last = new ColumnSlice(last.start, s2.finish);
-                continue;
-            }
-
-            assert !includesFinish;
-        }
-
-        slicesCopy.add(last);
-
-        return slicesCopy.toArray(new ColumnSlice[slicesCopy.size()]);
-    }
-
-    @Override
-    public final int hashCode()
-    {
-        int hashCode = 31 + start.hashCode();
-        return 31*hashCode + finish.hashCode();
-    }
-
-    @Override
-    public final boolean equals(Object o)
-    {
-        if(!(o instanceof ColumnSlice))
-            return false;
-        ColumnSlice that = (ColumnSlice)o;
-        return start.equals(that.start) && finish.equals(that.finish);
-    }
-
-    @Override
-    public String toString()
-    {
-        return "[" + ByteBufferUtil.bytesToHex(start.toByteBuffer()) + ", " + 
ByteBufferUtil.bytesToHex(finish.toByteBuffer()) + "]";
-    }
-
-    public static class Serializer implements IVersionedSerializer<ColumnSlice>
-    {
-        private final CType type;
-
-        public Serializer(CType type)
-        {
-            this.type = type;
-        }
-
-        public void serialize(ColumnSlice cs, DataOutputPlus out, int version) 
throws IOException
-        {
-            ISerializer<Composite> serializer = type.serializer();
-            serializer.serialize(cs.start, out);
-            serializer.serialize(cs.finish, out);
-        }
-
-        public ColumnSlice deserialize(DataInput in, int version) throws 
IOException
-        {
-            ISerializer<Composite> serializer = type.serializer();
-            Composite start = serializer.deserialize(in);
-            Composite finish = serializer.deserialize(in);
-            return new ColumnSlice(start, finish);
-        }
-
-        public long serializedSize(ColumnSlice cs, int version)
-        {
-            ISerializer<Composite> serializer = type.serializer();
-            return serializer.serializedSize(cs.start, TypeSizes.NATIVE) + 
serializer.serializedSize(cs.finish, TypeSizes.NATIVE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java 
b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
new file mode 100644
index 0000000..35db6f2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Handles the selection of a subpart of a column.
+ * <p>
+ * This only make sense for complex column. For those, this allow for instance
+ * to select only a slice of a map.
+ */
+public abstract class ColumnSubselection
+{
+    public static final Serializer serializer = new Serializer();
+
+    private enum Kind { SLICE, ELEMENT }
+
+    protected final ColumnDefinition column;
+
+    protected ColumnSubselection(ColumnDefinition column)
+    {
+        this.column = column;
+    }
+
+    public static ColumnSubselection slice(ColumnDefinition column, CellPath 
from, CellPath to)
+    {
+        assert column.isComplex() && column.type instanceof CollectionType;
+        assert from.size() <= 1 && to.size() <= 1;
+        return new Slice(column, from, to);
+    }
+
+    public static ColumnSubselection element(ColumnDefinition column, CellPath 
elt)
+    {
+        assert column.isComplex() && column.type instanceof CollectionType;
+        assert elt.size() == 1;
+        return new Element(column, elt);
+    }
+
+    public ColumnDefinition column()
+    {
+        return column;
+    }
+
+    protected abstract Kind kind();
+
+    public abstract CellPath minIncludedPath();
+    public abstract CellPath maxIncludedPath();
+    public abstract boolean includes(CellPath path);
+
+    private static class Slice extends ColumnSubselection
+    {
+        private final CellPath from;
+        private final CellPath to;
+
+        private Slice(ColumnDefinition column, CellPath from, CellPath to)
+        {
+            super(column);
+            this.from = from;
+            this.to = to;
+        }
+
+        protected Kind kind()
+        {
+            return Kind.SLICE;
+        }
+
+        public CellPath minIncludedPath()
+        {
+            return from;
+        }
+
+        public CellPath maxIncludedPath()
+        {
+            return to;
+        }
+
+        public boolean includes(CellPath path)
+        {
+            Comparator<CellPath> cmp = column.cellPathComparator();
+            return cmp.compare(from, path) <= 0 && cmp.compare(path, to) <= 0;
+        }
+
+        @Override
+        public String toString()
+        {
+            // This assert we're dealing with a collection since that's the 
only thing it's used for so far.
+            AbstractType<?> type = 
((CollectionType<?>)column().type).nameComparator();
+            return String.format("[%s:%s]", from == CellPath.BOTTOM ? "" : 
type.getString(from.get(0)), to == CellPath.TOP ? "" : 
type.getString(to.get(0)));
+        }
+    }
+
+    private static class Element extends ColumnSubselection
+    {
+        private final CellPath element;
+
+        private Element(ColumnDefinition column, CellPath elt)
+        {
+            super(column);
+            this.element = elt;
+        }
+
+        protected Kind kind()
+        {
+            return Kind.ELEMENT;
+        }
+
+        public CellPath minIncludedPath()
+        {
+            return element;
+        }
+
+        public CellPath maxIncludedPath()
+        {
+            return element;
+        }
+
+        public boolean includes(CellPath path)
+        {
+            Comparator<CellPath> cmp = column.cellPathComparator();
+            return cmp.compare(element, path) == 0;
+        }
+
+        @Override
+        public String toString()
+        {
+            // This assert we're dealing with a collection since that's the 
only thing it's used for so far.
+            AbstractType<?> type = 
((CollectionType<?>)column().type).nameComparator();
+            return String.format("[%s]", type.getString(element.get(0)));
+        }
+    }
+
+    public static class Serializer
+    {
+        public void serialize(ColumnSubselection subSel, DataOutputPlus out, 
int version) throws IOException
+        {
+            ColumnDefinition column = subSel.column();
+            ByteBufferUtil.writeWithShortLength(column.name.bytes, out);
+            out.writeByte(subSel.kind().ordinal());
+            switch (subSel.kind())
+            {
+                case SLICE:
+                    Slice slice = (Slice)subSel;
+                    column.cellPathSerializer().serialize(slice.from, out);
+                    column.cellPathSerializer().serialize(slice.to, out);
+                    break;
+                case ELEMENT:
+                    Element eltSelection = (Element)subSel;
+                    
column.cellPathSerializer().serialize(eltSelection.element, out);
+                    break;
+            }
+            throw new AssertionError();
+        }
+
+        public ColumnSubselection deserialize(DataInput in, int version, 
CFMetaData metadata) throws IOException
+        {
+            ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
+            ColumnDefinition column = metadata.getColumnDefinition(name);
+            if (column == null)
+            {
+                // If we don't find the definition, it could be we have data 
for a dropped column, and we shouldn't
+                // fail deserialization because of that. So we grab a "fake" 
ColumnDefinition that ensure proper
+                // deserialization. The column will be ignore later on anyway.
+                column = metadata.getDroppedColumnDefinition(name);
+                if (column == null)
+                    throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
+            }
+
+            Kind kind = Kind.values()[in.readUnsignedByte()];
+            switch (kind)
+            {
+                case SLICE:
+                    CellPath from = 
column.cellPathSerializer().deserialize(in);
+                    CellPath to = column.cellPathSerializer().deserialize(in);
+                    return new Slice(column, from, to);
+                case ELEMENT:
+                    CellPath elt = column.cellPathSerializer().deserialize(in);
+                    return new Element(column, elt);
+            }
+            throw new AssertionError();
+        }
+
+        public long serializedSize(ColumnSubselection subSel, int version, 
TypeSizes sizes)
+        {
+            long size = 0;
+
+            ColumnDefinition column = subSel.column();
+            size += sizes.sizeofWithShortLength(column.name.bytes);
+            size += 1; // kind
+            switch (subSel.kind())
+            {
+                case SLICE:
+                    Slice slice = (Slice)subSel;
+                    size += 
column.cellPathSerializer().serializedSize(slice.from, sizes);
+                    size += 
column.cellPathSerializer().serializedSize(slice.to, sizes);
+                    break;
+                case ELEMENT:
+                    Element element = (Element)subSel;
+                    size += 
column.cellPathSerializer().serializedSize(element.element, sizes);
+                    break;
+            }
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java 
b/src/java/org/apache/cassandra/db/filter/DataLimits.java
new file mode 100644
index 0000000..42bfa4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Object in charge of tracking if we have fetch enough data for a given query.
+ *
+ * The reason this is not just a simple integer is that Thrift and CQL3 count
+ * stuffs in different ways. This is what abstract those differences.
+ */
+public abstract class DataLimits
+{
+    public static final Serializer serializer = new Serializer();
+
+    public static final DataLimits NONE = new CQLLimits(Integer.MAX_VALUE)
+    {
+        @Override
+        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
+        {
+            return false;
+        }
+
+        @Override
+        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, int nowInSec)
+        {
+            return iter;
+        }
+
+        @Override
+        public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int 
nowInSec)
+        {
+            return iter;
+        }
+    };
+
+    // We currently deal with distinct queries by querying full partitions but 
limiting the result at 1 row per
+    // partition (see SelectStatement.makeFilter). So an "unbounded" distinct 
is still actually doing some filtering.
+    public static final DataLimits DISTINCT_NONE = new 
CQLLimits(Integer.MAX_VALUE, 1, true);
+
+    private enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, 
SUPER_COLUMN_COUNTING_LIMIT }
+
+    public static DataLimits cqlLimits(int cqlRowLimit)
+    {
+        return new CQLLimits(cqlRowLimit);
+    }
+
+    public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit)
+    {
+        return new CQLLimits(cqlRowLimit, perPartitionLimit);
+    }
+
+    public static DataLimits distinctLimits(int cqlRowLimit)
+    {
+        return CQLLimits.distinct(cqlRowLimit);
+    }
+
+    public static DataLimits thriftLimits(int partitionLimit, int 
cellPerPartitionLimit)
+    {
+        return new ThriftLimits(partitionLimit, cellPerPartitionLimit);
+    }
+
+    public static DataLimits superColumnCountingLimits(int partitionLimit, int 
cellPerPartitionLimit)
+    {
+        return new SuperColumnCountingLimits(partitionLimit, 
cellPerPartitionLimit);
+    }
+
+    protected abstract Kind kind();
+
+    public abstract boolean isUnlimited();
+
+    public abstract DataLimits forPaging(int pageSize);
+    public abstract DataLimits forPaging(int pageSize, ByteBuffer 
lastReturnedKey, int lastReturnedKeyRemaining);
+
+    public abstract DataLimits forShortReadRetry(int toFetch);
+
+    public abstract boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec);
+
+    /**
+     * Returns a new {@code Counter} for this limits.
+     *
+     * @param nowInSec the current time in second (to decide what is expired 
or not).
+     * @param assumeLiveData if true, the counter will assume that every row 
passed is live and won't
+     * thus check for liveness, otherwise it will. This should be {@code true} 
when used on a
+     * {@code RowIterator} (since it only returns live rows), false otherwise.
+     * @return a new {@code Counter} for this limits.
+     */
+    public abstract Counter newCounter(int nowInSec, boolean assumeLiveData);
+
+    /**
+     * The max number of results this limits enforces.
+     * <p>
+     * Note that the actual definition of "results" depends a bit: for CQL, 
it's always rows, but for
+     * thrift, it means cells. The {@link #countsCells} allows to distinguish 
between the two cases if
+     * needed.
+     *
+     * @return the maximum number of results this limits enforces.
+     */
+    public abstract int count();
+
+    public abstract int perPartitionCount();
+
+    public abstract boolean countsCells();
+
+    public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, int nowInSec)
+    {
+        return new CountingUnfilteredPartitionIterator(iter, 
newCounter(nowInSec, false));
+    }
+
+    public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int 
nowInSec)
+    {
+        return new CountingUnfilteredRowIterator(iter, newCounter(nowInSec, 
false));
+    }
+
+    public PartitionIterator filter(PartitionIterator iter, int nowInSec)
+    {
+        return new CountingPartitionIterator(iter, this, nowInSec);
+    }
+
+    /**
+     * Estimate the number of results (the definition of "results" will be 
rows for CQL queries
+     * and partitions for thrift ones) that a full scan of the provided cfs 
would yield.
+     */
+    public abstract float estimateTotalResults(ColumnFamilyStore cfs);
+
+    public interface Counter
+    {
+        public void newPartition(DecoratedKey partitionKey, Row staticRow);
+        public void newRow(Row row);
+        public void endOfPartition();
+
+        /**
+         * The number of results counted.
+         * <p>
+         * Note that the definition of "results" should be the same that for 
{@link #count}.
+         *
+         * @return the number of results counted.
+         */
+        public int counted();
+
+        public int countedInCurrentPartition();
+
+        public boolean isDone();
+        public boolean isDoneForPartition();
+    }
+
+    /**
+     * Limits used by CQL; this counts rows.
+     */
+    private static class CQLLimits extends DataLimits
+    {
+        protected final int rowLimit;
+        protected final int perPartitionLimit;
+
+        // Whether the query is a distinct query or not. This is currently not 
used by the code but prior experience
+        // shows that keeping the information around is wise and might be 
useful in the future.
+        protected final boolean isDistinct;
+
+        private CQLLimits(int rowLimit)
+        {
+            this(rowLimit, Integer.MAX_VALUE);
+        }
+
+        private CQLLimits(int rowLimit, int perPartitionLimit)
+        {
+            this(rowLimit, perPartitionLimit, false);
+        }
+
+        private CQLLimits(int rowLimit, int perPartitionLimit, boolean 
isDistinct)
+        {
+            this.rowLimit = rowLimit;
+            this.perPartitionLimit = perPartitionLimit;
+            this.isDistinct = isDistinct;
+        }
+
+        private static CQLLimits distinct(int rowLimit)
+        {
+            return new CQLLimits(rowLimit, 1, true);
+        }
+
+        protected Kind kind()
+        {
+            return Kind.CQL_LIMIT;
+        }
+
+        public boolean isUnlimited()
+        {
+            return rowLimit == Integer.MAX_VALUE && perPartitionLimit == 
Integer.MAX_VALUE;
+        }
+
+        public DataLimits forPaging(int pageSize)
+        {
+            return new CQLLimits(pageSize, perPartitionLimit);
+        }
+
+        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
+        {
+            return new CQLPagingLimits(pageSize, perPartitionLimit, 
isDistinct, lastReturnedKey, lastReturnedKeyRemaining);
+        }
+
+        public DataLimits forShortReadRetry(int toFetch)
+        {
+            // When we do a short read retry, we're only ever querying the 
single partition on which we have a short read. So
+            // we use toFetch as the row limit and use no perPartitionLimit 
(it would be equivalent in practice to use toFetch
+            // for both argument or just for perPartitionLimit with no limit 
on rowLimit).
+            return new CQLLimits(toFetch, Integer.MAX_VALUE, isDistinct);
+        }
+
+        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
+        {
+            // We want the number of row that are currently live. Getting that 
precise number forces
+            // us to iterate the cached partition in general, but we can avoid 
that if:
+            //   - The number of rows with at least one non-expiring cell is 
greater than what we ask,
+            //     in which case we know we have enough live.
+            //   - The number of rows is less than requested, in which case we 
 know we won't have enough.
+            if (cached.rowsWithNonExpiringCells() >= rowLimit)
+                return true;
+
+            if (cached.rowCount() < rowLimit)
+                return false;
+
+            // Otherwise, we need to re-count
+            try (UnfilteredRowIterator cacheIter = 
cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, 
false);
+                 CountingUnfilteredRowIterator iter = new 
CountingUnfilteredRowIterator(cacheIter, newCounter(nowInSec, false)))
+            {
+                // Consume the iterator until we've counted enough
+                while (iter.hasNext() && !iter.counter().isDone())
+                    iter.next();
+                return iter.counter().isDone();
+            }
+        }
+
+        public Counter newCounter(int nowInSec, boolean assumeLiveData)
+        {
+            return new CQLCounter(nowInSec, assumeLiveData);
+        }
+
+        public int count()
+        {
+            return rowLimit;
+        }
+
+        public int perPartitionCount()
+        {
+            return perPartitionLimit;
+        }
+
+        public boolean countsCells()
+        {
+            return false;
+        }
+
+        public float estimateTotalResults(ColumnFamilyStore cfs)
+        {
+            // TODO: we should start storing stats on the number of rows 
(instead of the number of cells, which
+            // is what getMeanColumns returns)
+            float rowsPerPartition = ((float) cfs.getMeanColumns()) / 
cfs.metadata.partitionColumns().regulars.columnCount();
+            return rowsPerPartition * (cfs.estimateKeys());
+        }
+
+        protected class CQLCounter implements Counter
+        {
+            protected final int nowInSec;
+            protected final boolean assumeLiveData;
+
+            protected int rowCounted;
+            protected int rowInCurrentPartition;
+
+            protected boolean hasLiveStaticRow;
+
+            public CQLCounter(int nowInSec, boolean assumeLiveData)
+            {
+                this.nowInSec = nowInSec;
+                this.assumeLiveData = assumeLiveData;
+            }
+
+            public void newPartition(DecoratedKey partitionKey, Row staticRow)
+            {
+                rowInCurrentPartition = 0;
+                if (!staticRow.isEmpty() && (assumeLiveData || 
staticRow.hasLiveData(nowInSec)))
+                    hasLiveStaticRow = true;
+            }
+
+            public void endOfPartition()
+            {
+                // Normally, we don't count static rows as from a CQL point of 
view, it will be merge with other
+                // rows in the partition. However, if we only have the static 
row, it will be returned as one row
+                // so count it.
+                if (hasLiveStaticRow && rowInCurrentPartition == 0)
+                    ++rowCounted;
+            }
+
+            public int counted()
+            {
+                return rowCounted;
+            }
+
+            public int countedInCurrentPartition()
+            {
+                return rowInCurrentPartition;
+            }
+
+            public boolean isDone()
+            {
+                return rowCounted >= rowLimit;
+            }
+
+            public boolean isDoneForPartition()
+            {
+                return isDone() || rowInCurrentPartition >= perPartitionLimit;
+            }
+
+            public void newRow(Row row)
+            {
+                if (assumeLiveData || row.hasLiveData(nowInSec))
+                {
+                    ++rowCounted;
+                    ++rowInCurrentPartition;
+                }
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+
+            if (rowLimit != Integer.MAX_VALUE)
+            {
+                sb.append("LIMIT ").append(rowLimit);
+                if (perPartitionLimit != Integer.MAX_VALUE)
+                    sb.append(" ");
+            }
+
+            if (perPartitionLimit != Integer.MAX_VALUE)
+                sb.append("PER PARTITION LIMIT ").append(perPartitionLimit);
+
+            return sb.toString();
+        }
+    }
+
+    private static class CQLPagingLimits extends CQLLimits
+    {
+        private final ByteBuffer lastReturnedKey;
+        private final int lastReturnedKeyRemaining;
+
+        public CQLPagingLimits(int rowLimit, int perPartitionLimit, boolean 
isDistinct, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
+        {
+            super(rowLimit, perPartitionLimit, isDistinct);
+            this.lastReturnedKey = lastReturnedKey;
+            this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
+        }
+
+        @Override
+        protected Kind kind()
+        {
+            return Kind.CQL_PAGING_LIMIT;
+        }
+
+        @Override
+        public DataLimits forPaging(int pageSize)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Counter newCounter(int nowInSec, boolean assumeLiveData)
+        {
+            return new PagingAwareCounter(nowInSec, assumeLiveData);
+        }
+
+        private class PagingAwareCounter extends CQLCounter
+        {
+            private PagingAwareCounter(int nowInSec, boolean assumeLiveData)
+            {
+                super(nowInSec, assumeLiveData);
+            }
+
+            @Override
+            public void newPartition(DecoratedKey partitionKey, Row staticRow)
+            {
+                if (partitionKey.getKey().equals(lastReturnedKey))
+                {
+                    rowInCurrentPartition = perPartitionLimit - 
lastReturnedKeyRemaining;
+                    // lastReturnedKey is the last key for which we're 
returned rows in the first page.
+                    // So, since we know we have returned rows, we know we 
have accounted for the static row
+                    // if any already, so force hasLiveStaticRow to false so 
we make sure to not count it
+                    // once more.
+                    hasLiveStaticRow = false;
+                }
+                else
+                {
+                    super.newPartition(partitionKey, staticRow);
+                }
+            }
+        }
+    }
+
+    /**
+     * Limits used by thrift; this count partition and cells.
+     */
+    private static class ThriftLimits extends DataLimits
+    {
+        protected final int partitionLimit;
+        protected final int cellPerPartitionLimit;
+
+        private ThriftLimits(int partitionLimit, int cellPerPartitionLimit)
+        {
+            this.partitionLimit = partitionLimit;
+            this.cellPerPartitionLimit = cellPerPartitionLimit;
+        }
+
+        protected Kind kind()
+        {
+            return Kind.THRIFT_LIMIT;
+        }
+
+        public boolean isUnlimited()
+        {
+            return partitionLimit == Integer.MAX_VALUE && 
cellPerPartitionLimit == Integer.MAX_VALUE;
+        }
+
+        public DataLimits forPaging(int pageSize)
+        {
+            // We don't support paging on thrift in general but do use paging 
under the hood for get_count. For
+            // that case, we only care about limiting cellPerPartitionLimit 
(since it's paging over a single
+            // partition). We do check that the partition limit is 1 however 
to make sure this is not misused
+            // (as this wouldn't work properly for range queries).
+            assert partitionLimit == 1;
+            return new ThriftLimits(partitionLimit, pageSize);
+        }
+
+        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public DataLimits forShortReadRetry(int toFetch)
+        {
+            // Short read retries are always done for a single partition at a 
time, so it's ok to ignore the
+            // partition limit for those
+            return new ThriftLimits(1, toFetch);
+        }
+
+        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
+        {
+            // We want the number of cells that are currently live. Getting 
that precise number forces
+            // us to iterate the cached partition in general, but we can avoid 
that if:
+            //   - The number of non-expiring live cells is greater than the 
number of cells asked (we then
+            //     know we have enough live cells).
+            //   - The number of cells cached is less than requested, in which 
case we know we won't have enough.
+            if (cached.nonExpiringLiveCells() >= cellPerPartitionLimit)
+                return true;
+
+            if (cached.nonTombstoneCellCount() < cellPerPartitionLimit)
+                return false;
+
+            // Otherwise, we need to re-count
+            try (UnfilteredRowIterator cacheIter = 
cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, 
false);
+                 CountingUnfilteredRowIterator iter = new 
CountingUnfilteredRowIterator(cacheIter, newCounter(nowInSec, false)))
+            {
+                // Consume the iterator until we've counted enough
+                while (iter.hasNext() && !iter.counter().isDone())
+                    iter.next();
+                return iter.counter().isDone();
+            }
+        }
+
+        public Counter newCounter(int nowInSec, boolean assumeLiveData)
+        {
+            return new ThriftCounter(nowInSec, assumeLiveData);
+        }
+
+        public int count()
+        {
+            return partitionLimit * cellPerPartitionLimit;
+        }
+
+        public int perPartitionCount()
+        {
+            return cellPerPartitionLimit;
+        }
+
+        public boolean countsCells()
+        {
+            return true;
+        }
+
+        public float estimateTotalResults(ColumnFamilyStore cfs)
+        {
+            // remember that getMeansColumns returns a number of cells: we 
should clean nomenclature
+            float cellsPerPartition = ((float) cfs.getMeanColumns()) / 
cfs.metadata.partitionColumns().regulars.columnCount();
+            return cellsPerPartition * cfs.estimateKeys();
+        }
+
+        protected class ThriftCounter implements Counter
+        {
+            protected final int nowInSec;
+            protected final boolean assumeLiveData;
+
+            protected int partitionsCounted;
+            protected int cellsCounted;
+            protected int cellsInCurrentPartition;
+
+            public ThriftCounter(int nowInSec, boolean assumeLiveData)
+            {
+                this.nowInSec = nowInSec;
+                this.assumeLiveData = assumeLiveData;
+            }
+
+            public void newPartition(DecoratedKey partitionKey, Row staticRow)
+            {
+                cellsInCurrentPartition = 0;
+                if (!staticRow.isEmpty())
+                    newRow(staticRow);
+            }
+
+            public void endOfPartition()
+            {
+                ++partitionsCounted;
+            }
+
+            public int counted()
+            {
+                return cellsCounted;
+            }
+
+            public int countedInCurrentPartition()
+            {
+                return cellsInCurrentPartition;
+            }
+
+            public boolean isDone()
+            {
+                return partitionsCounted >= partitionLimit;
+            }
+
+            public boolean isDoneForPartition()
+            {
+                return isDone() || cellsInCurrentPartition >= 
cellPerPartitionLimit;
+            }
+
+            public void newRow(Row row)
+            {
+                for (Cell cell : row)
+                {
+                    if (assumeLiveData || cell.isLive(nowInSec))
+                    {
+                        ++cellsCounted;
+                        ++cellsInCurrentPartition;
+                    }
+                }
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            // This is not valid CQL, but that's ok since it's not used for 
CQL queries.
+            return String.format("THRIFT LIMIT (partitions=%d, 
cells_per_partition=%d)", partitionLimit, cellPerPartitionLimit);
+        }
+    }
+
+    /**
+     * Limits used for thrift get_count when we only want to count super 
columns.
+     */
+    private static class SuperColumnCountingLimits extends ThriftLimits
+    {
+        private SuperColumnCountingLimits(int partitionLimit, int 
cellPerPartitionLimit)
+        {
+            super(partitionLimit, cellPerPartitionLimit);
+        }
+
+        protected Kind kind()
+        {
+            return Kind.SUPER_COLUMN_COUNTING_LIMIT;
+        }
+
+        public DataLimits forPaging(int pageSize)
+        {
+            // We don't support paging on thrift in general but do use paging 
under the hood for get_count. For
+            // that case, we only care about limiting cellPerPartitionLimit 
(since it's paging over a single
+            // partition). We do check that the partition limit is 1 however 
to make sure this is not misused
+            // (as this wouldn't work properly for range queries).
+            assert partitionLimit == 1;
+            return new SuperColumnCountingLimits(partitionLimit, pageSize);
+        }
+
+        public DataLimits forShortReadRetry(int toFetch)
+        {
+            // Short read retries are always done for a single partition at a 
time, so it's ok to ignore the
+            // partition limit for those
+            return new SuperColumnCountingLimits(1, toFetch);
+        }
+
+        public Counter newCounter(int nowInSec, boolean assumeLiveData)
+        {
+            return new SuperColumnCountingCounter(nowInSec, assumeLiveData);
+        }
+
+        protected class SuperColumnCountingCounter extends ThriftCounter
+        {
+            public SuperColumnCountingCounter(int nowInSec, boolean 
assumeLiveData)
+            {
+                super(nowInSec, assumeLiveData);
+            }
+
+            public void newRow(Row row)
+            {
+                // In the internal format, a row == a super column, so that's 
what we want to count.
+                if (assumeLiveData || row.hasLiveData(nowInSec))
+                {
+                    ++cellsCounted;
+                    ++cellsInCurrentPartition;
+                }
+            }
+        }
+    }
+
+    public static class Serializer
+    {
+        public void serialize(DataLimits limits, DataOutputPlus out, int 
version) throws IOException
+        {
+            out.writeByte(limits.kind().ordinal());
+            switch (limits.kind())
+            {
+                case CQL_LIMIT:
+                case CQL_PAGING_LIMIT:
+                    CQLLimits cqlLimits = (CQLLimits)limits;
+                    out.writeInt(cqlLimits.rowLimit);
+                    out.writeInt(cqlLimits.perPartitionLimit);
+                    out.writeBoolean(cqlLimits.isDistinct);
+                    if (limits.kind() == Kind.CQL_PAGING_LIMIT)
+                    {
+                        CQLPagingLimits pagingLimits = 
(CQLPagingLimits)cqlLimits;
+                        
ByteBufferUtil.writeWithShortLength(pagingLimits.lastReturnedKey, out);
+                        out.writeInt(pagingLimits.lastReturnedKeyRemaining);
+                    }
+                    break;
+                case THRIFT_LIMIT:
+                case SUPER_COLUMN_COUNTING_LIMIT:
+                    ThriftLimits thriftLimits = (ThriftLimits)limits;
+                    out.writeInt(thriftLimits.partitionLimit);
+                    out.writeInt(thriftLimits.cellPerPartitionLimit);
+                    break;
+            }
+        }
+
+        public DataLimits deserialize(DataInput in, int version) throws 
IOException
+        {
+            Kind kind = Kind.values()[in.readUnsignedByte()];
+            switch (kind)
+            {
+                case CQL_LIMIT:
+                case CQL_PAGING_LIMIT:
+                    int rowLimit = in.readInt();
+                    int perPartitionLimit = in.readInt();
+                    boolean isDistinct = in.readBoolean();
+                    if (kind == Kind.CQL_LIMIT)
+                        return new CQLLimits(rowLimit, perPartitionLimit, 
isDistinct);
+
+                    ByteBuffer lastKey = 
ByteBufferUtil.readWithShortLength(in);
+                    int lastRemaining = in.readInt();
+                    return new CQLPagingLimits(rowLimit, perPartitionLimit, 
isDistinct, lastKey, lastRemaining);
+                case THRIFT_LIMIT:
+                case SUPER_COLUMN_COUNTING_LIMIT:
+                    int partitionLimit = in.readInt();
+                    int cellPerPartitionLimit = in.readInt();
+                    return kind == Kind.THRIFT_LIMIT
+                         ? new ThriftLimits(partitionLimit, 
cellPerPartitionLimit)
+                         : new SuperColumnCountingLimits(partitionLimit, 
cellPerPartitionLimit);
+            }
+            throw new AssertionError();
+        }
+
+        public long serializedSize(DataLimits limits, int version)
+        {
+            TypeSizes sizes = TypeSizes.NATIVE;
+            long size = sizes.sizeof((byte)limits.kind().ordinal());
+            switch (limits.kind())
+            {
+                case CQL_LIMIT:
+                case CQL_PAGING_LIMIT:
+                    CQLLimits cqlLimits = (CQLLimits)limits;
+                    size += sizes.sizeof(cqlLimits.rowLimit);
+                    size += sizes.sizeof(cqlLimits.perPartitionLimit);
+                    size += sizes.sizeof(cqlLimits.isDistinct);
+                    if (limits.kind() == Kind.CQL_PAGING_LIMIT)
+                    {
+                        CQLPagingLimits pagingLimits = 
(CQLPagingLimits)cqlLimits;
+                        size += 
ByteBufferUtil.serializedSizeWithShortLength(pagingLimits.lastReturnedKey, 
sizes);
+                        size += 
sizes.sizeof(pagingLimits.lastReturnedKeyRemaining);
+                    }
+                    break;
+                case THRIFT_LIMIT:
+                case SUPER_COLUMN_COUNTING_LIMIT:
+                    ThriftLimits thriftLimits = (ThriftLimits)limits;
+                    size += sizes.sizeof(thriftLimits.partitionLimit);
+                    size += sizes.sizeof(thriftLimits.cellPerPartitionLimit);
+                    break;
+                default:
+                    throw new AssertionError();
+            }
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java 
b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
deleted file mode 100644
index 50ab57d..0000000
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import com.google.common.base.Objects;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IndexExpression;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * Extends a column filter (IFilter) to include a number of IndexExpression.
- */
-public abstract class ExtendedFilter
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(ExtendedFilter.class);
-
-    public final ColumnFamilyStore cfs;
-    public final long timestamp;
-    public final DataRange dataRange;
-    private final int maxResults;
-    private final boolean countCQL3Rows;
-    private volatile int currentLimit;
-
-    public static ExtendedFilter create(ColumnFamilyStore cfs,
-                                        DataRange dataRange,
-                                        List<IndexExpression> clause,
-                                        int maxResults,
-                                        boolean countCQL3Rows,
-                                        long timestamp)
-    {
-        if (clause == null || clause.isEmpty())
-            return new EmptyClauseFilter(cfs, dataRange, maxResults, 
countCQL3Rows, timestamp);
-
-        return new WithClauses(cfs, dataRange, clause, maxResults, 
countCQL3Rows, timestamp);
-    }
-
-    protected ExtendedFilter(ColumnFamilyStore cfs, DataRange dataRange, int 
maxResults, boolean countCQL3Rows, long timestamp)
-    {
-        assert cfs != null;
-        assert dataRange != null;
-        this.cfs = cfs;
-        this.dataRange = dataRange;
-        this.maxResults = maxResults;
-        this.timestamp = timestamp;
-        this.countCQL3Rows = countCQL3Rows;
-        this.currentLimit = maxResults;
-        if (countCQL3Rows)
-            dataRange.updateColumnsLimit(maxResults);
-    }
-
-    public int maxRows()
-    {
-        return countCQL3Rows ? Integer.MAX_VALUE : maxResults;
-    }
-
-    public int maxColumns()
-    {
-        return countCQL3Rows ? maxResults : Integer.MAX_VALUE;
-    }
-
-    public int currentLimit()
-    {
-        return currentLimit;
-    }
-
-    public IDiskAtomFilter columnFilter(ByteBuffer key)
-    {
-        return dataRange.columnFilter(key);
-    }
-
-    public int lastCounted(ColumnFamily data)
-    {
-        return dataRange.getLiveCount(data, timestamp);
-    }
-
-    public void updateFilter(int currentColumnsCount)
-    {
-        if (!countCQL3Rows)
-            return;
-
-        currentLimit = maxResults - currentColumnsCount;
-        // We propagate that limit to the underlying filter so each internal 
query don't
-        // fetch more than we needs it to.
-        dataRange.updateColumnsLimit(currentLimit);
-    }
-
-    public abstract List<IndexExpression> getClause();
-
-    /**
-     * Returns a filter to query the columns from the clause that the initial 
slice filter may not have caught.
-     * @param data the data retrieve by the initial filter
-     * @return a filter or null if there can't be any columns we missed with 
our initial filter (typically if it was a names query, or a slice of the entire 
row)
-     */
-    public abstract IDiskAtomFilter getExtraFilter(DecoratedKey key, 
ColumnFamily data);
-
-    /**
-     * @return data pruned down to the columns originally asked for
-     */
-    public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data);
-
-    /** Returns true if tombstoned partitions should not be included in 
results or count towards the limit, false otherwise. */
-    public boolean ignoreTombstonedPartitions()
-    {
-        return dataRange.ignoredTombstonedPartitions();
-    }
-
-    /**
-     * @return true if the provided data satisfies all the expressions from
-     * the clause of this filter.
-     */
-    public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily 
data, Composite prefix, ByteBuffer collectionElement);
-
-    public static boolean satisfies(int comparison, Operator op)
-    {
-        switch (op)
-        {
-            case EQ:
-                return comparison == 0;
-            case GTE:
-                return comparison >= 0;
-            case GT:
-                return comparison > 0;
-            case LTE:
-                return comparison <= 0;
-            case LT:
-                return comparison < 0;
-            default:
-                throw new IllegalStateException();
-        }
-    }
-
-    @Override
-    public String toString()
-    {
-        return Objects.toStringHelper(this)
-                      .add("dataRange", dataRange)
-                      .add("maxResults", maxResults)
-                      .add("currentLimit", currentLimit)
-                      .add("timestamp", timestamp)
-                      .add("countCQL3Rows", countCQL3Rows)
-                      .toString();
-    }
-
-    public static class WithClauses extends ExtendedFilter
-    {
-        private final List<IndexExpression> clause;
-        private final IDiskAtomFilter optimizedFilter;
-
-        public WithClauses(ColumnFamilyStore cfs,
-                           DataRange range,
-                           List<IndexExpression> clause,
-                           int maxResults,
-                           boolean countCQL3Rows,
-                           long timestamp)
-        {
-            super(cfs, range, maxResults, countCQL3Rows, timestamp);
-            assert clause != null;
-            this.clause = clause;
-            this.optimizedFilter = computeOptimizedFilter();
-        }
-
-        /*
-         * Potentially optimize the column filter if we have a change to make 
it catch all clauses
-         * right away.
-         */
-        private IDiskAtomFilter computeOptimizedFilter()
-        {
-            /*
-             * We shouldn't do the "optimization" for composites as the index 
names are not valid column names 
-             * (which the rest of the method assumes). Said optimization is 
not useful for composites anyway.
-             * We also don't want to do for paging ranges as the actual filter 
depends on the row key (it would
-             * probably be possible to make it work but we won't really use it 
so we don't bother).
-             */
-            if (cfs.getComparator().isCompound() || dataRange instanceof 
DataRange.Paging)
-                return null;
-
-            IDiskAtomFilter filter = dataRange.columnFilter(null); // ok since 
not a paging range
-            if (filter instanceof SliceQueryFilter)
-            {
-                // if we have a high chance of getting all the columns in a 
single index slice (and it's not too costly), do that.
-                // otherwise, the extraFilter (lazily created) will fetch by 
name the columns referenced by the additional expressions.
-                if (cfs.metric.maxRowSize.getValue() < 
DatabaseDescriptor.getColumnIndexSize())
-                {
-                    logger.trace("Expanding slice filter to entire row to 
cover additional expressions");
-                    return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, 
((SliceQueryFilter)filter).reversed, Integer.MAX_VALUE);
-                }
-            }
-            else
-            {
-                logger.trace("adding columns to original Filter to cover 
additional expressions");
-                assert filter instanceof NamesQueryFilter;
-                if (!clause.isEmpty())
-                {
-                    SortedSet<CellName> columns = new 
TreeSet<CellName>(cfs.getComparator());
-                    for (IndexExpression expr : clause)
-                        
columns.add(cfs.getComparator().cellFromByteBuffer(expr.column));
-                    columns.addAll(((NamesQueryFilter) filter).columns);
-                    return ((NamesQueryFilter) 
filter).withUpdatedColumns(columns);
-                }
-            }
-            return null;
-        }
-
-        @Override
-        public IDiskAtomFilter columnFilter(ByteBuffer key)
-        {
-            return optimizedFilter == null ? dataRange.columnFilter(key) : 
optimizedFilter;
-        }
-
-        public List<IndexExpression> getClause()
-        {
-            return clause;
-        }
-
-        /*
-         * We may need an extra query only if the original query wasn't 
selecting the row entirely.
-         * Furthermore, we only need the extra query if we haven't yet got all 
the expressions from the clause.
-         */
-        private boolean needsExtraQuery(ByteBuffer rowKey, ColumnFamily data)
-        {
-            IDiskAtomFilter filter = columnFilter(rowKey);
-            if (filter instanceof SliceQueryFilter && 
DataRange.isFullRowSlice((SliceQueryFilter)filter))
-                return false;
-
-            for (IndexExpression expr : clause)
-            {
-                if 
(data.getColumn(data.getComparator().cellFromByteBuffer(expr.column)) == null)
-                {
-                    logger.debug("adding extraFilter to cover additional 
expressions");
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        public IDiskAtomFilter getExtraFilter(DecoratedKey rowKey, 
ColumnFamily data)
-        {
-            /*
-             * This method assumes the IndexExpression names are valid column 
names, which is not the
-             * case with composites. This is ok for now however since:
-             * 1) CompositeSearcher doesn't use it.
-             * 2) We don't yet allow non-indexed range slice with filters in 
CQL3 (i.e. this will never be
-             * called by CFS.filter() for composites).
-             */
-            assert !(cfs.getComparator().isCompound()) : "Sequential scan with 
filters is not supported (if you just created an index, you "
-                                                         + "need to wait for 
the creation to be propagated to all nodes before querying it)";
-
-            if (!needsExtraQuery(rowKey.getKey(), data))
-                return null;
-
-            // Note: for counters we must be careful to not add a column that 
was already there (to avoid overcount). That is
-            // why we do the dance of avoiding to query any column we already 
have (it's also more efficient anyway)
-            SortedSet<CellName> columns = new 
TreeSet<CellName>(cfs.getComparator());
-            for (IndexExpression expr : clause)
-            {
-                CellName name = 
data.getComparator().cellFromByteBuffer(expr.column);
-                if (data.getColumn(name) == null)
-                    columns.add(name);
-            }
-            assert !columns.isEmpty();
-            return new NamesQueryFilter(columns);
-        }
-
-        public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
-        {
-            if (optimizedFilter == null)
-                return data;
-
-            ColumnFamily pruned = data.cloneMeShallow();
-            IDiskAtomFilter filter = dataRange.columnFilter(rowKey.getKey());
-            Iterator<Cell> iter = filter.getColumnIterator(data);
-            try
-            {
-                filter.collectReducedColumns(pruned, 
QueryFilter.gatherTombstones(pruned, iter), rowKey, cfs.gcBefore(timestamp), 
timestamp);
-            }
-            catch (TombstoneOverwhelmingException e)
-            {
-                e.setKey(rowKey);
-                throw e;
-            }
-            return pruned;
-        }
-
-        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, 
Composite prefix, ByteBuffer collectionElement)
-        {
-            for (IndexExpression expression : clause)
-            {
-                ColumnDefinition def = 
data.metadata().getColumnDefinition(expression.column);
-                ByteBuffer dataValue = null;
-                AbstractType<?> validator = null;
-                if (def == null)
-                {
-                    // This can't happen with CQL3 as this should be rejected 
upfront. For thrift however,
-                    // cell name are not predefined. But that means the cell 
name correspond to an internal one.
-                    Cell cell = 
data.getColumn(data.getComparator().cellFromByteBuffer(expression.column));
-                    if (cell != null)
-                    {
-                        dataValue = cell.value();
-                        validator = data.metadata().getDefaultValidator();
-                    }
-                }
-                else
-                {
-                    if (def.type.isCollection() && def.type.isMultiCell())
-                    {
-                        if (!collectionSatisfies(def, data, prefix, 
expression))
-                            return false;
-                        continue;
-                    }
-
-                    dataValue = extractDataValue(def, rowKey.getKey(), data, 
prefix);
-                    validator = def.type;
-                }
-
-                if (dataValue == null)
-                    return false;
-
-                if (expression.operator == Operator.CONTAINS)
-                {
-                    assert def != null && def.type.isCollection() && 
!def.type.isMultiCell();
-                    CollectionType type = (CollectionType)def.type;
-                    switch (type.kind)
-                    {
-                        case LIST:
-                            ListType<?> listType = (ListType)def.type;
-                            if 
(!listType.getSerializer().deserialize(dataValue).contains(listType.getElementsType().getSerializer().deserialize(expression.value)))
-                                return false;
-                            break;
-                        case SET:
-                            SetType<?> setType = (SetType)def.type;
-                            if 
(!setType.getSerializer().deserialize(dataValue).contains(setType.getElementsType().getSerializer().deserialize(expression.value)))
-                                return false;
-                            break;
-                        case MAP:
-                            MapType<?,?> mapType = (MapType)def.type;
-                            if 
(!mapType.getSerializer().deserialize(dataValue).containsValue(mapType.getValuesType().getSerializer().deserialize(expression.value)))
-                                return false;
-                            break;
-                    }
-                }
-                else if (expression.operator == Operator.CONTAINS_KEY)
-                {
-                    assert def != null && def.type.isCollection() && 
!def.type.isMultiCell() && def.type instanceof MapType;
-                    MapType<?,?> mapType = (MapType)def.type;
-                    if (mapType.getSerializer().getSerializedValue(dataValue, 
expression.value, mapType.getKeysType()) == null)
-                        return false;
-                }
-                else
-                {
-                    int v = validator.compare(dataValue, expression.value);
-                    if (!satisfies(v, expression.operator))
-                        return false;
-                }
-            }
-            return true;
-        }
-
-        private static boolean collectionSatisfies(ColumnDefinition def, 
ColumnFamily data, Composite prefix, IndexExpression expr)
-        {
-            assert def.type.isCollection() && def.type.isMultiCell();
-            CollectionType type = (CollectionType)def.type;
-
-            if (expr.isContains())
-            {
-                // get a slice of the collection cells
-                Iterator<Cell> iter = data.iterator(new ColumnSlice[]{ 
data.getComparator().create(prefix, def).slice() });
-                while (iter.hasNext())
-                {
-                    Cell cell = iter.next();
-                    if (type.kind == CollectionType.Kind.SET)
-                    {
-                        if 
(type.nameComparator().compare(cell.name().collectionElement(), expr.value) == 
0)
-                            return true;
-                    }
-                    else
-                    {
-                        if (type.valueComparator().compare(cell.value(), 
expr.value) == 0)
-                            return true;
-                    }
-                }
-
-                return false;
-            }
-
-            assert type.kind == CollectionType.Kind.MAP;
-            if (expr.isContainsKey())
-                return data.getColumn(data.getComparator().create(prefix, def, 
expr.value)) != null;
-
-            Iterator<Cell> iter = data.iterator(new ColumnSlice[]{ 
data.getComparator().create(prefix, def).slice() });
-            ByteBuffer key = CompositeType.extractComponent(expr.value, 0);
-            ByteBuffer value = CompositeType.extractComponent(expr.value, 1);
-            while (iter.hasNext())
-            {
-                Cell next = iter.next();
-                if 
(type.nameComparator().compare(next.name().collectionElement(), key) == 0 &&
-                    type.valueComparator().compare(next.value(), value) == 0)
-                    return true;
-            }
-            return false;
-        }
-
-        private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer 
rowKey, ColumnFamily data, Composite prefix)
-        {
-            switch (def.kind)
-            {
-                case PARTITION_KEY:
-                    return def.isOnAllComponents()
-                         ? rowKey
-                         : 
((CompositeType)data.metadata().getKeyValidator()).split(rowKey)[def.position()];
-                case CLUSTERING_COLUMN:
-                    return prefix.get(def.position());
-                case REGULAR:
-                    CellName cname = prefix == null
-                                   ? 
data.getComparator().cellFromByteBuffer(def.name.bytes)
-                                   : data.getComparator().create(prefix, def);
-
-                    Cell cell = data.getColumn(cname);
-                    return cell == null ? null : cell.value();
-                case COMPACT_VALUE:
-                    assert data.getColumnCount() == 1;
-                    return data.getSortedColumns().iterator().next().value();
-            }
-            throw new AssertionError();
-        }
-
-        @Override
-        public String toString()
-        {
-            return Objects.toStringHelper(this)
-                          .add("dataRange", dataRange)
-                          .add("timestamp", timestamp)
-                          .add("clause", clause)
-                          .toString();
-        }
-    }
-
-    private static class EmptyClauseFilter extends ExtendedFilter
-    {
-        public EmptyClauseFilter(ColumnFamilyStore cfs, DataRange range, int 
maxResults, boolean countCQL3Rows, long timestamp)
-        {
-            super(cfs, range, maxResults, countCQL3Rows, timestamp);
-        }
-
-        public List<IndexExpression> getClause()
-        {
-            return Collections.<IndexExpression>emptyList();
-        }
-
-        public IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily 
data)
-        {
-            return null;
-        }
-
-        public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
-        {
-            return data;
-        }
-
-        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, 
Composite prefix, ByteBuffer collectionElement)
-        {
-            return true;
-        }
-    }
-}

Reply via email to