http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java
index 94f43dc,0000000..48ec06a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@@ -1,814 -1,0 +1,827 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.filter;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.transform.BasePartitions;
 +import org.apache.cassandra.db.transform.BaseRows;
 +import org.apache.cassandra.db.transform.StoppingTransformation;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +/**
 + * Object in charge of tracking if we have fetch enough data for a given 
query.
 + *
 + * The reason this is not just a simple integer is that Thrift and CQL3 count
 + * stuffs in different ways. This is what abstract those differences.
 + */
 +public abstract class DataLimits
 +{
 +    public static final Serializer serializer = new Serializer();
 +
 +    public static final int NO_LIMIT = Integer.MAX_VALUE;
 +
 +    public static final DataLimits NONE = new CQLLimits(NO_LIMIT)
 +    {
 +        @Override
-         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
++        public boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec, boolean countPartitionsWithOnlyStaticData)
 +        {
 +            return false;
 +        }
 +
 +        @Override
-         public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, int nowInSec)
++        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter,
++                                                  int nowInSec,
++                                                  boolean 
countPartitionsWithOnlyStaticData)
 +        {
 +            return iter;
 +        }
 +
 +        @Override
-         public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int 
nowInSec)
++        public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
++                                            int nowInSec,
++                                            boolean 
countPartitionsWithOnlyStaticData)
 +        {
 +            return iter;
 +        }
 +    };
 +
 +    // We currently deal with distinct queries by querying full partitions 
but limiting the result at 1 row per
 +    // partition (see SelectStatement.makeFilter). So an "unbounded" distinct 
is still actually doing some filtering.
 +    public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, 
true);
 +
 +    public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, 
SUPER_COLUMN_COUNTING_LIMIT }
 +
 +    public static DataLimits cqlLimits(int cqlRowLimit)
 +    {
 +        return new CQLLimits(cqlRowLimit);
 +    }
 +
 +    public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit)
 +    {
 +        return new CQLLimits(cqlRowLimit, perPartitionLimit);
 +    }
 +
 +    public static DataLimits distinctLimits(int cqlRowLimit)
 +    {
 +        return CQLLimits.distinct(cqlRowLimit);
 +    }
 +
 +    public static DataLimits thriftLimits(int partitionLimit, int 
cellPerPartitionLimit)
 +    {
 +        return new ThriftLimits(partitionLimit, cellPerPartitionLimit);
 +    }
 +
 +    public static DataLimits superColumnCountingLimits(int partitionLimit, 
int cellPerPartitionLimit)
 +    {
 +        return new SuperColumnCountingLimits(partitionLimit, 
cellPerPartitionLimit);
 +    }
 +
 +    public abstract Kind kind();
 +
 +    public abstract boolean isUnlimited();
 +    public abstract boolean isDistinct();
 +
 +    public abstract DataLimits forPaging(int pageSize);
 +    public abstract DataLimits forPaging(int pageSize, ByteBuffer 
lastReturnedKey, int lastReturnedKeyRemaining);
 +
 +    public abstract DataLimits forShortReadRetry(int toFetch);
 +
-     public abstract boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec);
++    public abstract boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec, boolean countPartitionsWithOnlyStaticData);
 +
 +    /**
 +     * Returns a new {@code Counter} for this limits.
 +     *
 +     * @param nowInSec the current time in second (to decide what is expired 
or not).
 +     * @param assumeLiveData if true, the counter will assume that every row 
passed is live and won't
 +     * thus check for liveness, otherwise it will. This should be {@code 
true} when used on a
 +     * {@code RowIterator} (since it only returns live rows), false otherwise.
++     * @param countPartitionsWithOnlyStaticData if {@code true} the 
partitions with only static data should be counted
++     * as 1 valid row.
 +     * @return a new {@code Counter} for this limits.
 +     */
-     public abstract Counter newCounter(int nowInSec, boolean assumeLiveData);
++    public abstract Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData);
 +
 +    /**
 +     * The max number of results this limits enforces.
 +     * <p>
 +     * Note that the actual definition of "results" depends a bit: for CQL, 
it's always rows, but for
 +     * thrift, it means cells.
 +     *
 +     * @return the maximum number of results this limits enforces.
 +     */
 +    public abstract int count();
 +
 +    public abstract int perPartitionCount();
 +
-     public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, int nowInSec)
++    public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter,
++                                              int nowInSec,
++                                              boolean 
countPartitionsWithOnlyStaticData)
 +    {
-         return this.newCounter(nowInSec, false).applyTo(iter);
++        return this.newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData).applyTo(iter);
 +    }
 +
-     public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int 
nowInSec)
++    public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
++                                        int nowInSec,
++                                        boolean 
countPartitionsWithOnlyStaticData)
 +    {
-         return this.newCounter(nowInSec, false).applyTo(iter);
++        return this.newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData).applyTo(iter);
 +    }
 +
-     public PartitionIterator filter(PartitionIterator iter, int nowInSec)
++    public PartitionIterator filter(PartitionIterator iter, int nowInSec, 
boolean countPartitionsWithOnlyStaticData)
 +    {
-         return this.newCounter(nowInSec, true).applyTo(iter);
++        return this.newCounter(nowInSec, true, 
countPartitionsWithOnlyStaticData).applyTo(iter);
 +    }
 +
 +    /**
 +     * Estimate the number of results (the definition of "results" will be 
rows for CQL queries
 +     * and partitions for thrift ones) that a full scan of the provided cfs 
would yield.
 +     */
 +    public abstract float estimateTotalResults(ColumnFamilyStore cfs);
 +
 +    public static abstract class Counter extends 
StoppingTransformation<BaseRowIterator<?>>
 +    {
 +        // false means we do not propagate our stop signals onto the 
iterator, we only count
 +        private boolean enforceLimits = true;
 +
 +        public Counter onlyCount()
 +        {
 +            this.enforceLimits = false;
 +            return this;
 +        }
 +
 +        public PartitionIterator applyTo(PartitionIterator partitions)
 +        {
 +            return Transformation.apply(partitions, this);
 +        }
 +
 +        public UnfilteredPartitionIterator 
applyTo(UnfilteredPartitionIterator partitions)
 +        {
 +            return Transformation.apply(partitions, this);
 +        }
 +
 +        public UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
 +        {
 +            return (UnfilteredRowIterator) applyToPartition(partition);
 +        }
 +
 +        public RowIterator applyTo(RowIterator partition)
 +        {
 +            return (RowIterator) applyToPartition(partition);
 +        }
 +
 +        /**
 +         * The number of results counted.
 +         * <p>
 +         * Note that the definition of "results" should be the same that for 
{@link #count}.
 +         *
 +         * @return the number of results counted.
 +         */
 +        public abstract int counted();
 +        public abstract int countedInCurrentPartition();
 +
 +        public abstract boolean isDone();
 +        public abstract boolean isDoneForPartition();
 +
 +        @Override
 +        protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> 
partition)
 +        {
 +            return partition instanceof UnfilteredRowIterator ? 
Transformation.apply((UnfilteredRowIterator) partition, this)
 +                                                              : 
Transformation.apply((RowIterator) partition, this);
 +        }
 +
 +        // called before we process a given partition
 +        protected abstract void applyToPartition(DecoratedKey partitionKey, 
Row staticRow);
 +
 +        @Override
 +        protected void attachTo(BasePartitions partitions)
 +        {
 +            if (enforceLimits)
 +                super.attachTo(partitions);
 +            if (isDone())
 +                stop();
 +        }
 +
 +        @Override
 +        protected void attachTo(BaseRows rows)
 +        {
 +            if (enforceLimits)
 +                super.attachTo(rows);
 +            applyToPartition(rows.partitionKey(), rows.staticRow());
 +            if (isDoneForPartition())
 +                stopInPartition();
 +        }
 +    }
 +
 +    /**
 +     * Limits used by CQL; this counts rows.
 +     */
 +    private static class CQLLimits extends DataLimits
 +    {
 +        protected final int rowLimit;
 +        protected final int perPartitionLimit;
 +
 +        // Whether the query is a distinct query or not.
 +        protected final boolean isDistinct;
 +
 +        private CQLLimits(int rowLimit)
 +        {
 +            this(rowLimit, NO_LIMIT);
 +        }
 +
 +        private CQLLimits(int rowLimit, int perPartitionLimit)
 +        {
 +            this(rowLimit, perPartitionLimit, false);
 +        }
 +
 +        private CQLLimits(int rowLimit, int perPartitionLimit, boolean 
isDistinct)
 +        {
 +            this.rowLimit = rowLimit;
 +            this.perPartitionLimit = perPartitionLimit;
 +            this.isDistinct = isDistinct;
 +        }
 +
 +        private static CQLLimits distinct(int rowLimit)
 +        {
 +            return new CQLLimits(rowLimit, 1, true);
 +        }
 +
 +        public Kind kind()
 +        {
 +            return Kind.CQL_LIMIT;
 +        }
 +
 +        public boolean isUnlimited()
 +        {
 +            return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
 +        }
 +
 +        public boolean isDistinct()
 +        {
 +            return isDistinct;
 +        }
 +
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            return new CQLLimits(pageSize, perPartitionLimit, isDistinct);
 +        }
 +
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
 +        {
 +            return new CQLPagingLimits(pageSize, perPartitionLimit, 
isDistinct, lastReturnedKey, lastReturnedKeyRemaining);
 +        }
 +
 +        public DataLimits forShortReadRetry(int toFetch)
 +        {
 +            // When we do a short read retry, we're only ever querying the 
single partition on which we have a short read. So
 +            // we use toFetch as the row limit and use no perPartitionLimit 
(it would be equivalent in practice to use toFetch
 +            // for both argument or just for perPartitionLimit with no limit 
on rowLimit).
 +            return new CQLLimits(toFetch, NO_LIMIT, isDistinct);
 +        }
 +
-         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
++        public boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec, boolean countPartitionsWithOnlyStaticData)
 +        {
 +            // We want the number of row that are currently live. Getting 
that precise number forces
 +            // us to iterate the cached partition in general, but we can 
avoid that if:
 +            //   - The number of rows with at least one non-expiring cell is 
greater than what we ask,
 +            //     in which case we know we have enough live.
 +            //   - The number of rows is less than requested, in which case 
we  know we won't have enough.
 +            if (cached.rowsWithNonExpiringCells() >= rowLimit)
 +                return true;
 +
 +            if (cached.rowCount() < rowLimit)
 +                return false;
 +
 +            // Otherwise, we need to re-count
 +
-             DataLimits.Counter counter = newCounter(nowInSec, false);
++            DataLimits.Counter counter = newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData);
 +            try (UnfilteredRowIterator cacheIter = 
cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, 
false);
 +                 UnfilteredRowIterator iter = counter.applyTo(cacheIter))
 +            {
 +                // Consume the iterator until we've counted enough
 +                while (iter.hasNext())
 +                    iter.next();
 +                return counter.isDone();
 +            }
 +        }
 +
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
 +        {
-             return new CQLCounter(nowInSec, assumeLiveData);
++            return new CQLCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
 +        }
 +
 +        public int count()
 +        {
 +            return rowLimit;
 +        }
 +
 +        public int perPartitionCount()
 +        {
 +            return perPartitionLimit;
 +        }
 +
 +        public float estimateTotalResults(ColumnFamilyStore cfs)
 +        {
 +            // TODO: we should start storing stats on the number of rows 
(instead of the number of cells, which
 +            // is what getMeanColumns returns)
 +            float rowsPerPartition = ((float) cfs.getMeanColumns()) / 
cfs.metadata.partitionColumns().regulars.size();
 +            return rowsPerPartition * (cfs.estimateKeys());
 +        }
 +
 +        protected class CQLCounter extends Counter
 +        {
 +            protected final int nowInSec;
 +            protected final boolean assumeLiveData;
++            protected final boolean countPartitionsWithOnlyStaticData;
 +
 +            protected int rowCounted;
 +            protected int rowInCurrentPartition;
 +
 +            protected boolean hasLiveStaticRow;
 +
-             public CQLCounter(int nowInSec, boolean assumeLiveData)
++            public CQLCounter(int nowInSec, boolean assumeLiveData, boolean 
countPartitionsWithOnlyStaticData)
 +            {
 +                this.nowInSec = nowInSec;
 +                this.assumeLiveData = assumeLiveData;
++                this.countPartitionsWithOnlyStaticData = 
countPartitionsWithOnlyStaticData;
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row 
staticRow)
 +            {
 +                rowInCurrentPartition = 0;
 +                hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || 
staticRow.hasLiveData(nowInSec));
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                if (assumeLiveData || row.hasLiveData(nowInSec))
 +                    incrementRowCount();
 +                return row;
 +            }
 +
 +            @Override
 +            public void onPartitionClose()
 +            {
 +                // Normally, we don't count static rows as from a CQL point 
of view, it will be merge with other
 +                // rows in the partition. However, if we only have the static 
row, it will be returned as one row
 +                // so count it.
-                 if (hasLiveStaticRow && rowInCurrentPartition == 0)
++                if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && 
rowInCurrentPartition == 0)
 +                    incrementRowCount();
 +                super.onPartitionClose();
 +            }
 +
 +            private void incrementRowCount()
 +            {
 +                if (++rowCounted >= rowLimit)
 +                    stop();
 +                if (++rowInCurrentPartition >= perPartitionLimit)
 +                    stopInPartition();
 +            }
 +
 +            public int counted()
 +            {
 +                return rowCounted;
 +            }
 +
 +            public int countedInCurrentPartition()
 +            {
 +                return rowInCurrentPartition;
 +            }
 +
 +            public boolean isDone()
 +            {
 +                return rowCounted >= rowLimit;
 +            }
 +
 +            public boolean isDoneForPartition()
 +            {
 +                return isDone() || rowInCurrentPartition >= perPartitionLimit;
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            StringBuilder sb = new StringBuilder();
 +
 +            if (rowLimit != NO_LIMIT)
 +            {
 +                sb.append("LIMIT ").append(rowLimit);
 +                if (perPartitionLimit != NO_LIMIT)
 +                    sb.append(' ');
 +            }
 +
 +            if (perPartitionLimit != NO_LIMIT)
 +                sb.append("PER PARTITION LIMIT ").append(perPartitionLimit);
 +
 +            return sb.toString();
 +        }
 +    }
 +
 +    private static class CQLPagingLimits extends CQLLimits
 +    {
 +        private final ByteBuffer lastReturnedKey;
 +        private final int lastReturnedKeyRemaining;
 +
 +        public CQLPagingLimits(int rowLimit, int perPartitionLimit, boolean 
isDistinct, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
 +        {
 +            super(rowLimit, perPartitionLimit, isDistinct);
 +            this.lastReturnedKey = lastReturnedKey;
 +            this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
 +        }
 +
 +        @Override
 +        public Kind kind()
 +        {
 +            return Kind.CQL_PAGING_LIMIT;
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
 +        {
-             return new PagingAwareCounter(nowInSec, assumeLiveData);
++            return new PagingAwareCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
 +        }
 +
 +        private class PagingAwareCounter extends CQLCounter
 +        {
-             private PagingAwareCounter(int nowInSec, boolean assumeLiveData)
++            private PagingAwareCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
 +            {
-                 super(nowInSec, assumeLiveData);
++                super(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row 
staticRow)
 +            {
 +                if (partitionKey.getKey().equals(lastReturnedKey))
 +                {
 +                    rowInCurrentPartition = perPartitionLimit - 
lastReturnedKeyRemaining;
 +                    // lastReturnedKey is the last key for which we're 
returned rows in the first page.
 +                    // So, since we know we have returned rows, we know we 
have accounted for the static row
 +                    // if any already, so force hasLiveStaticRow to false so 
we make sure to not count it
 +                    // once more.
 +                    hasLiveStaticRow = false;
 +                }
 +                else
 +                {
 +                    super.applyToPartition(partitionKey, staticRow);
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Limits used by thrift; this count partition and cells.
 +     */
 +    private static class ThriftLimits extends DataLimits
 +    {
 +        protected final int partitionLimit;
 +        protected final int cellPerPartitionLimit;
 +
 +        private ThriftLimits(int partitionLimit, int cellPerPartitionLimit)
 +        {
 +            this.partitionLimit = partitionLimit;
 +            this.cellPerPartitionLimit = cellPerPartitionLimit;
 +        }
 +
 +        public Kind kind()
 +        {
 +            return Kind.THRIFT_LIMIT;
 +        }
 +
 +        public boolean isUnlimited()
 +        {
 +            return partitionLimit == NO_LIMIT && cellPerPartitionLimit == 
NO_LIMIT;
 +        }
 +
 +        public boolean isDistinct()
 +        {
 +            return false;
 +        }
 +
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            // We don't support paging on thrift in general but do use paging 
under the hood for get_count. For
 +            // that case, we only care about limiting cellPerPartitionLimit 
(since it's paging over a single
 +            // partition). We do check that the partition limit is 1 however 
to make sure this is not misused
 +            // (as this wouldn't work properly for range queries).
 +            assert partitionLimit == 1;
 +            return new ThriftLimits(partitionLimit, pageSize);
 +        }
 +
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public DataLimits forShortReadRetry(int toFetch)
 +        {
 +            // Short read retries are always done for a single partition at a 
time, so it's ok to ignore the
 +            // partition limit for those
 +            return new ThriftLimits(1, toFetch);
 +        }
 +
-         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
++        public boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec, boolean countPartitionsWithOnlyStaticData)
 +        {
 +            // We want the number of cells that are currently live. Getting 
that precise number forces
 +            // us to iterate the cached partition in general, but we can 
avoid that if:
 +            //   - The number of non-expiring live cells is greater than the 
number of cells asked (we then
 +            //     know we have enough live cells).
 +            //   - The number of cells cached is less than requested, in 
which case we know we won't have enough.
 +            if (cached.nonExpiringLiveCells() >= cellPerPartitionLimit)
 +                return true;
 +
 +            if (cached.nonTombstoneCellCount() < cellPerPartitionLimit)
 +                return false;
 +
 +            // Otherwise, we need to re-count
-             DataLimits.Counter counter = newCounter(nowInSec, false);
++            DataLimits.Counter counter = newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData);
 +            try (UnfilteredRowIterator cacheIter = 
cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, 
false);
 +                 UnfilteredRowIterator iter = counter.applyTo(cacheIter))
 +            {
 +                // Consume the iterator until we've counted enough
 +                while (iter.hasNext())
 +                    iter.next();
 +                return counter.isDone();
 +            }
 +        }
 +
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
 +        {
 +            return new ThriftCounter(nowInSec, assumeLiveData);
 +        }
 +
 +        public int count()
 +        {
 +            return partitionLimit * cellPerPartitionLimit;
 +        }
 +
 +        public int perPartitionCount()
 +        {
 +            return cellPerPartitionLimit;
 +        }
 +
 +        public float estimateTotalResults(ColumnFamilyStore cfs)
 +        {
 +            // remember that getMeansColumns returns a number of cells: we 
should clean nomenclature
 +            float cellsPerPartition = ((float) cfs.getMeanColumns()) / 
cfs.metadata.partitionColumns().regulars.size();
 +            return cellsPerPartition * cfs.estimateKeys();
 +        }
 +
 +        protected class ThriftCounter extends Counter
 +        {
 +            protected final int nowInSec;
 +            protected final boolean assumeLiveData;
 +
 +            protected int partitionsCounted;
 +            protected int cellsCounted;
 +            protected int cellsInCurrentPartition;
 +
 +            public ThriftCounter(int nowInSec, boolean assumeLiveData)
 +            {
 +                this.nowInSec = nowInSec;
 +                this.assumeLiveData = assumeLiveData;
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row 
staticRow)
 +            {
 +                cellsInCurrentPartition = 0;
 +                if (!staticRow.isEmpty())
 +                    applyToRow(staticRow);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                for (Cell cell : row.cells())
 +                {
 +                    if (assumeLiveData || cell.isLive(nowInSec))
 +                    {
 +                        ++cellsCounted;
 +                        if (++cellsInCurrentPartition >= 
cellPerPartitionLimit)
 +                            stopInPartition();
 +                    }
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public void onPartitionClose()
 +            {
 +                if (++partitionsCounted >= partitionLimit)
 +                    stop();
 +                super.onPartitionClose();
 +            }
 +
 +            public int counted()
 +            {
 +                return cellsCounted;
 +            }
 +
 +            public int countedInCurrentPartition()
 +            {
 +                return cellsInCurrentPartition;
 +            }
 +
 +            public boolean isDone()
 +            {
 +                return partitionsCounted >= partitionLimit;
 +            }
 +
 +            public boolean isDoneForPartition()
 +            {
 +                return isDone() || cellsInCurrentPartition >= 
cellPerPartitionLimit;
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            // This is not valid CQL, but that's ok since it's not used for 
CQL queries.
 +            return String.format("THRIFT LIMIT (partitions=%d, 
cells_per_partition=%d)", partitionLimit, cellPerPartitionLimit);
 +        }
 +    }
 +
 +    /**
 +     * Limits used for thrift get_count when we only want to count super 
columns.
 +     */
 +    private static class SuperColumnCountingLimits extends ThriftLimits
 +    {
 +        private SuperColumnCountingLimits(int partitionLimit, int 
cellPerPartitionLimit)
 +        {
 +            super(partitionLimit, cellPerPartitionLimit);
 +        }
 +
 +        public Kind kind()
 +        {
 +            return Kind.SUPER_COLUMN_COUNTING_LIMIT;
 +        }
 +
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            // We don't support paging on thrift in general but do use paging 
under the hood for get_count. For
 +            // that case, we only care about limiting cellPerPartitionLimit 
(since it's paging over a single
 +            // partition). We do check that the partition limit is 1 however 
to make sure this is not misused
 +            // (as this wouldn't work properly for range queries).
 +            assert partitionLimit == 1;
 +            return new SuperColumnCountingLimits(partitionLimit, pageSize);
 +        }
 +
 +        public DataLimits forShortReadRetry(int toFetch)
 +        {
 +            // Short read retries are always done for a single partition at a 
time, so it's ok to ignore the
 +            // partition limit for those
 +            return new SuperColumnCountingLimits(1, toFetch);
 +        }
 +
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        @Override
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
 +        {
 +            return new SuperColumnCountingCounter(nowInSec, assumeLiveData);
 +        }
 +
 +        protected class SuperColumnCountingCounter extends ThriftCounter
 +        {
 +            public SuperColumnCountingCounter(int nowInSec, boolean 
assumeLiveData)
 +            {
 +                super(nowInSec, assumeLiveData);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                // In the internal format, a row == a super column, so that's 
what we want to count.
 +                if (assumeLiveData || row.hasLiveData(nowInSec))
 +                {
 +                    ++cellsCounted;
 +                    if (++cellsInCurrentPartition >= cellPerPartitionLimit)
 +                        stopInPartition();
 +                }
 +                return row;
 +            }
 +        }
 +    }
 +
 +    public static class Serializer
 +    {
 +        public void serialize(DataLimits limits, DataOutputPlus out, int 
version) throws IOException
 +        {
 +            out.writeByte(limits.kind().ordinal());
 +            switch (limits.kind())
 +            {
 +                case CQL_LIMIT:
 +                case CQL_PAGING_LIMIT:
 +                    CQLLimits cqlLimits = (CQLLimits)limits;
 +                    out.writeUnsignedVInt(cqlLimits.rowLimit);
 +                    out.writeUnsignedVInt(cqlLimits.perPartitionLimit);
 +                    out.writeBoolean(cqlLimits.isDistinct);
 +                    if (limits.kind() == Kind.CQL_PAGING_LIMIT)
 +                    {
 +                        CQLPagingLimits pagingLimits = 
(CQLPagingLimits)cqlLimits;
 +                        
ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out);
 +                        
out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
 +                    }
 +                    break;
 +                case THRIFT_LIMIT:
 +                case SUPER_COLUMN_COUNTING_LIMIT:
 +                    ThriftLimits thriftLimits = (ThriftLimits)limits;
 +                    out.writeUnsignedVInt(thriftLimits.partitionLimit);
 +                    out.writeUnsignedVInt(thriftLimits.cellPerPartitionLimit);
 +                    break;
 +            }
 +        }
 +
 +        public DataLimits deserialize(DataInputPlus in, int version) throws 
IOException
 +        {
 +            Kind kind = Kind.values()[in.readUnsignedByte()];
 +            switch (kind)
 +            {
 +                case CQL_LIMIT:
 +                case CQL_PAGING_LIMIT:
 +                    int rowLimit = (int)in.readUnsignedVInt();
 +                    int perPartitionLimit = (int)in.readUnsignedVInt();
 +                    boolean isDistinct = in.readBoolean();
 +                    if (kind == Kind.CQL_LIMIT)
 +                        return new CQLLimits(rowLimit, perPartitionLimit, 
isDistinct);
 +
 +                    ByteBuffer lastKey = 
ByteBufferUtil.readWithVIntLength(in);
 +                    int lastRemaining = (int)in.readUnsignedVInt();
 +                    return new CQLPagingLimits(rowLimit, perPartitionLimit, 
isDistinct, lastKey, lastRemaining);
 +                case THRIFT_LIMIT:
 +                case SUPER_COLUMN_COUNTING_LIMIT:
 +                    int partitionLimit = (int)in.readUnsignedVInt();
 +                    int cellPerPartitionLimit = (int)in.readUnsignedVInt();
 +                    return kind == Kind.THRIFT_LIMIT
 +                         ? new ThriftLimits(partitionLimit, 
cellPerPartitionLimit)
 +                         : new SuperColumnCountingLimits(partitionLimit, 
cellPerPartitionLimit);
 +            }
 +            throw new AssertionError();
 +        }
 +
 +        public long serializedSize(DataLimits limits, int version)
 +        {
 +            long size = TypeSizes.sizeof((byte)limits.kind().ordinal());
 +            switch (limits.kind())
 +            {
 +                case CQL_LIMIT:
 +                case CQL_PAGING_LIMIT:
 +                    CQLLimits cqlLimits = (CQLLimits)limits;
 +                    size += TypeSizes.sizeofUnsignedVInt(cqlLimits.rowLimit);
 +                    size += 
TypeSizes.sizeofUnsignedVInt(cqlLimits.perPartitionLimit);
 +                    size += TypeSizes.sizeof(cqlLimits.isDistinct);
 +                    if (limits.kind() == Kind.CQL_PAGING_LIMIT)
 +                    {
 +                        CQLPagingLimits pagingLimits = 
(CQLPagingLimits)cqlLimits;
 +                        size += 
ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
 +                        size += 
TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
 +                    }
 +                    break;
 +                case THRIFT_LIMIT:
 +                case SUPER_COLUMN_COUNTING_LIMIT:
 +                    ThriftLimits thriftLimits = (ThriftLimits)limits;
 +                    size += 
TypeSizes.sizeofUnsignedVInt(thriftLimits.partitionLimit);
 +                    size += 
TypeSizes.sizeofUnsignedVInt(thriftLimits.cellPerPartitionLimit);
 +                    break;
 +                default:
 +                    throw new AssertionError();
 +            }
 +            return size;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/RowFilter.java
index 5ffe2ab,0000000..8d11038
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@@ -1,994 -1,0 +1,1009 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.filter;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.base.Objects;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.context.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkBindValueSet;
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +
 +/**
 + * A filter on which rows a given query should include or exclude.
 + * <p>
 + * This corresponds to the restrictions on rows that are not handled by the 
query
 + * {@link ClusteringIndexFilter}. Some of the expressions of this filter may
 + * be handled by a 2ndary index, and the rest is simply filtered out from the
 + * result set (the later can only happen if the query was using ALLOW 
FILTERING).
 + */
 +public abstract class RowFilter implements Iterable<RowFilter.Expression>
 +{
 +    public static final Serializer serializer = new Serializer();
 +    public static final RowFilter NONE = new 
CQLFilter(Collections.emptyList());
 +
 +    protected final List<Expression> expressions;
 +
 +    protected RowFilter(List<Expression> expressions)
 +    {
 +        this.expressions = expressions;
 +    }
 +
 +    public static RowFilter create()
 +    {
 +        return new CQLFilter(new ArrayList<>());
 +    }
 +
 +    public static RowFilter create(int capacity)
 +    {
 +        return new CQLFilter(new ArrayList<>(capacity));
 +    }
 +
 +    public static RowFilter forThrift(int capacity)
 +    {
 +        return new ThriftFilter(new ArrayList<>(capacity));
 +    }
 +
 +    public void add(ColumnDefinition def, Operator op, ByteBuffer value)
 +    {
 +        add(new SimpleExpression(def, op, value));
 +    }
 +
 +    public void addMapEquality(ColumnDefinition def, ByteBuffer key, Operator 
op, ByteBuffer value)
 +    {
 +        add(new MapEqualityExpression(def, key, op, value));
 +    }
 +
 +    public void addThriftExpression(CFMetaData metadata, ByteBuffer name, 
Operator op, ByteBuffer value)
 +    {
 +        assert (this instanceof ThriftFilter);
 +        add(new ThriftExpression(metadata, name, op, value));
 +    }
 +
 +    public void addCustomIndexExpression(CFMetaData cfm, IndexMetadata 
targetIndex, ByteBuffer value)
 +    {
 +        add(new CustomExpression(cfm, targetIndex, value));
 +    }
 +
 +    private void add(Expression expression)
 +    {
 +        expression.validate();
 +        expressions.add(expression);
 +    }
 +
 +    public List<Expression> getExpressions()
 +    {
 +        return expressions;
 +    }
 +
 +    /**
++     * Checks if some of the expressions apply to clustering or regular 
columns.
++     * @return {@code true} if some of the expressions apply to clustering or 
regular columns, {@code false} otherwise.
++     */
++    public boolean hasExpressionOnClusteringOrRegularColumns()
++    {
++        for (Expression expression : expressions)
++        {
++            ColumnDefinition column = expression.column();
++            if (column.isClusteringColumn() || column.isRegular())
++                return true;
++        }
++        return false;
++    }
++
++    /**
 +     * Filters the provided iterator so that only the row satisfying the 
expression of this filter
 +     * are included in the resulting iterator.
 +     *
 +     * @param iter the iterator to filter
 +     * @param nowInSec the time of query in seconds.
 +     * @return the filtered iterator.
 +     */
 +    public abstract UnfilteredPartitionIterator 
filter(UnfilteredPartitionIterator iter, int nowInSec);
 +
 +    /**
 +     * Whether the provided row in the provided partition satisfies this 
filter.
 +     *
 +     * @param metadata the table metadata.
 +     * @param partitionKey the partition key for partition to test.
 +     * @param row the row to test.
 +     * @param nowInSec the current time in seconds (to know what is live and 
what isn't).
 +     * @return {@code true} if {@code row} in partition {@code partitionKey} 
satisfies this row filter.
 +     */
 +    public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row, int nowInSec)
 +    {
 +        // We purge all tombstones as the expressions isSatisfiedBy methods 
expects it
 +        Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
 +        if (purged == null)
 +            return expressions.isEmpty();
 +
 +        for (Expression e : expressions)
 +        {
 +            if (!e.isSatisfiedBy(metadata, partitionKey, purged))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Returns true if all of the expressions within this filter that apply 
to the partition key are satisfied by
 +     * the given key, false otherwise.
 +     */
 +    public boolean partitionKeyRestrictionsAreSatisfiedBy(DecoratedKey key, 
AbstractType<?> keyValidator)
 +    {
 +        for (Expression e : expressions)
 +        {
 +            if (!e.column.isPartitionKey())
 +                continue;
 +
 +            ByteBuffer value = keyValidator instanceof CompositeType
 +                             ? ((CompositeType) 
keyValidator).split(key.getKey())[e.column.position()]
 +                             : key.getKey();
 +            if (!e.operator().isSatisfiedBy(e.column.type, value, e.value))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Returns true if all of the expressions within this filter that apply 
to the clustering key are satisfied by
 +     * the given Clustering, false otherwise.
 +     */
 +    public boolean clusteringKeyRestrictionsAreSatisfiedBy(Clustering 
clustering)
 +    {
 +        for (Expression e : expressions)
 +        {
 +            if (!e.column.isClusteringColumn())
 +                continue;
 +
 +            if (!e.operator().isSatisfiedBy(e.column.type, 
clustering.get(e.column.position()), e.value))
 +            {
 +                return false;
 +            }
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Returns this filter but without the provided expression. This method
 +     * *assumes* that the filter contains the provided expression.
 +     */
 +    public RowFilter without(Expression expression)
 +    {
 +        assert expressions.contains(expression);
 +        if (expressions.size() == 1)
 +            return RowFilter.NONE;
 +
 +        List<Expression> newExpressions = new ArrayList<>(expressions.size() 
- 1);
 +        for (Expression e : expressions)
 +            if (!e.equals(expression))
 +                newExpressions.add(e);
 +
 +        return withNewExpressions(newExpressions);
 +    }
 +
 +    protected abstract RowFilter withNewExpressions(List<Expression> 
expressions);
 +
 +    public boolean isEmpty()
 +    {
 +        return expressions.isEmpty();
 +    }
 +
 +    public Iterator<Expression> iterator()
 +    {
 +        return expressions.iterator();
 +    }
 +
 +    private static Clustering makeCompactClustering(CFMetaData metadata, 
ByteBuffer name)
 +    {
 +        assert metadata.isCompactTable();
 +        if (metadata.isCompound())
 +        {
 +            List<ByteBuffer> values = CompositeType.splitName(name);
 +            return new Clustering(values.toArray(new 
ByteBuffer[metadata.comparator.size()]));
 +        }
 +        else
 +        {
 +            return new Clustering(name);
 +        }
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        for (int i = 0; i < expressions.size(); i++)
 +        {
 +            if (i > 0)
 +                sb.append(" AND ");
 +            sb.append(expressions.get(i));
 +        }
 +        return sb.toString();
 +    }
 +
 +    private static class CQLFilter extends RowFilter
 +    {
 +        private CQLFilter(List<Expression> expressions)
 +        {
 +            super(expressions);
 +        }
 +
 +        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, int nowInSec)
 +        {
 +            if (expressions.isEmpty())
 +                return iter;
 +
 +            final CFMetaData metadata = iter.metadata();
 +            long numberOfStaticColumnExpressions = 
expressions.stream().filter(e -> e.column.isStatic()).count();
 +            final boolean filterStaticColumns = 
numberOfStaticColumnExpressions != 0;
 +            final boolean filterNonStaticColumns = (expressions.size() - 
numberOfStaticColumnExpressions) > 0;
 +
 +            class IsSatisfiedFilter extends 
Transformation<UnfilteredRowIterator>
 +            {
 +                DecoratedKey pk;
 +                public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition)
 +                {
 +                    // The filter might be on static columns, so need to 
check static row first.
 +                    if (filterStaticColumns && 
applyToRow(partition.staticRow()) == null)
 +                        return null;
 +
 +                    pk = partition.partitionKey();
 +                    UnfilteredRowIterator iterator = 
Transformation.apply(partition, this);
 +
 +                    return (filterNonStaticColumns && !iterator.hasNext()) ? 
null : iterator;
 +                }
 +
 +                public Row applyToRow(Row row)
 +                {
 +                    Row purged = row.purge(DeletionPurger.PURGE_ALL, 
nowInSec);
 +                    if (purged == null)
 +                        return null;
 +
 +                    for (Expression e : expressions)
 +                        if (!e.isSatisfiedBy(metadata, pk, purged))
 +                            return null;
 +                    return row;
 +                }
 +            }
 +
 +            return Transformation.apply(iter, new IsSatisfiedFilter());
 +        }
 +
 +        protected RowFilter withNewExpressions(List<Expression> expressions)
 +        {
 +            return new CQLFilter(expressions);
 +        }
 +    }
 +
 +    private static class ThriftFilter extends RowFilter
 +    {
 +        private ThriftFilter(List<Expression> expressions)
 +        {
 +            super(expressions);
 +        }
 +
 +        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, final int nowInSec)
 +        {
 +            if (expressions.isEmpty())
 +                return iter;
 +
 +            class IsSatisfiedThriftFilter extends 
Transformation<UnfilteredRowIterator>
 +            {
 +                @Override
 +                public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator iter)
 +                {
 +                    // Thrift does not filter rows, it filters entire 
partition if any of the expression is not
 +                    // satisfied, which forces us to materialize the result 
(in theory we could materialize only
 +                    // what we need which might or might not be everything, 
but we keep it simple since in practice
 +                    // it's not worth that it has ever been).
 +                    ImmutableBTreePartition result = 
ImmutableBTreePartition.create(iter);
 +                    iter.close();
 +
 +                    // The partition needs to have a row for every 
expression, and the expression needs to be valid.
 +                    for (Expression expr : expressions)
 +                    {
 +                        assert expr instanceof ThriftExpression;
 +                        Row row = 
result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes));
 +                        if (row == null || 
!expr.isSatisfiedBy(iter.metadata(), iter.partitionKey(), row))
 +                            return null;
 +                    }
 +                    // If we get there, it means all expressions where 
satisfied, so return the original result
 +                    return result.unfilteredIterator();
 +                }
 +            }
 +            return Transformation.apply(iter, new IsSatisfiedThriftFilter());
 +        }
 +
 +        protected RowFilter withNewExpressions(List<Expression> expressions)
 +        {
 +            return new ThriftFilter(expressions);
 +        }
 +    }
 +
 +    public static abstract class Expression
 +    {
 +        private static final Serializer serializer = new Serializer();
 +
 +        // Note: the order of this enum matter, it's used for serialization
 +        protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM }
 +
 +        abstract Kind kind();
 +        protected final ColumnDefinition column;
 +        protected final Operator operator;
 +        protected final ByteBuffer value;
 +
 +        protected Expression(ColumnDefinition column, Operator operator, 
ByteBuffer value)
 +        {
 +            this.column = column;
 +            this.operator = operator;
 +            this.value = value;
 +        }
 +
 +        public boolean isCustom()
 +        {
 +            return kind() == Kind.CUSTOM;
 +        }
 +
 +        public ColumnDefinition column()
 +        {
 +            return column;
 +        }
 +
 +        public Operator operator()
 +        {
 +            return operator;
 +        }
 +
 +        /**
 +         * Checks if the operator of this <code>IndexExpression</code> is a 
<code>CONTAINS</code> operator.
 +         *
 +         * @return <code>true</code> if the operator of this 
<code>IndexExpression</code> is a <code>CONTAINS</code>
 +         * operator, <code>false</code> otherwise.
 +         */
 +        public boolean isContains()
 +        {
 +            return Operator.CONTAINS == operator;
 +        }
 +
 +        /**
 +         * Checks if the operator of this <code>IndexExpression</code> is a 
<code>CONTAINS_KEY</code> operator.
 +         *
 +         * @return <code>true</code> if the operator of this 
<code>IndexExpression</code> is a <code>CONTAINS_KEY</code>
 +         * operator, <code>false</code> otherwise.
 +         */
 +        public boolean isContainsKey()
 +        {
 +            return Operator.CONTAINS_KEY == operator;
 +        }
 +
 +        /**
 +         * If this expression is used to query an index, the value to use as
 +         * partition key for that index query.
 +         */
 +        public ByteBuffer getIndexValue()
 +        {
 +            return value;
 +        }
 +
 +        public void validate()
 +        {
 +            checkNotNull(value, "Unsupported null value for column %s", 
column.name);
 +            checkBindValueSet(value, "Unsupported unset value for column %s", 
column.name);
 +        }
 +
 +        @Deprecated
 +        public void validateForIndexing()
 +        {
 +            checkFalse(value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT,
 +                       "Index expression values may not be larger than 64K");
 +        }
 +
 +        /**
 +         * Returns whether the provided row satisfied this expression or not.
 +         *
 +         * @param partitionKey the partition key for row to check.
 +         * @param row the row to check. It should *not* contain deleted cells
 +         * (i.e. it should come from a RowIterator).
 +         * @return whether the row is satisfied by this expression.
 +         */
 +        public abstract boolean isSatisfiedBy(CFMetaData metadata, 
DecoratedKey partitionKey, Row row);
 +
 +        protected ByteBuffer getValue(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
 +        {
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    return metadata.getKeyValidator() instanceof CompositeType
 +                         ? 
CompositeType.extractComponent(partitionKey.getKey(), column.position())
 +                         : partitionKey.getKey();
 +                case CLUSTERING:
 +                    return row.clustering().get(column.position());
 +                default:
 +                    Cell cell = row.getCell(column);
 +                    return cell == null ? null : cell.value();
 +            }
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if (this == o)
 +                return true;
 +
 +            if (!(o instanceof Expression))
 +                return false;
 +
 +            Expression that = (Expression)o;
 +
 +            return Objects.equal(this.kind(), that.kind())
 +                && Objects.equal(this.column.name, that.column.name)
 +                && Objects.equal(this.operator, that.operator)
 +                && Objects.equal(this.value, that.value);
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            return Objects.hashCode(column.name, operator, value);
 +        }
 +
 +        private static class Serializer
 +        {
 +            public void serialize(Expression expression, DataOutputPlus out, 
int version) throws IOException
 +            {
 +                if (version >= MessagingService.VERSION_30)
 +                    out.writeByte(expression.kind().ordinal());
 +
 +                // Custom expressions include neither a column or operator, 
but all
 +                // other expressions do. Also, custom expressions are 3.0+ 
only, so
 +                // the column & operator will always be the first things 
written for
 +                // any pre-3.0 version
 +                if (expression.kind() == Kind.CUSTOM)
 +                {
 +                    assert version >= MessagingService.VERSION_30;
 +                    
IndexMetadata.serializer.serialize(((CustomExpression)expression).targetIndex, 
out, version);
 +                    ByteBufferUtil.writeWithShortLength(expression.value, 
out);
 +                    return;
 +                }
 +
 +                
ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out);
 +                expression.operator.writeTo(out);
 +
 +                switch (expression.kind())
 +                {
 +                    case SIMPLE:
 +                        
ByteBufferUtil.writeWithShortLength(((SimpleExpression)expression).value, out);
 +                        break;
 +                    case MAP_EQUALITY:
 +                        MapEqualityExpression mexpr = 
(MapEqualityExpression)expression;
 +                        if (version < MessagingService.VERSION_30)
 +                        {
 +                            
ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out);
 +                        }
 +                        else
 +                        {
 +                            ByteBufferUtil.writeWithShortLength(mexpr.key, 
out);
 +                            ByteBufferUtil.writeWithShortLength(mexpr.value, 
out);
 +                        }
 +                        break;
 +                    case THRIFT_DYN_EXPR:
 +                        
ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out);
 +                        break;
 +                }
 +            }
 +
 +            public Expression deserialize(DataInputPlus in, int version, 
CFMetaData metadata) throws IOException
 +            {
 +                Kind kind = null;
 +                ByteBuffer name;
 +                Operator operator;
 +                ColumnDefinition column;
 +
 +                if (version >= MessagingService.VERSION_30)
 +                {
 +                    kind = Kind.values()[in.readByte()];
 +                    // custom expressions (3.0+ only) do not contain a column 
or operator, only a value
 +                    if (kind == Kind.CUSTOM)
 +                    {
 +                        return new CustomExpression(metadata,
 +                                                    
IndexMetadata.serializer.deserialize(in, version, metadata),
 +                                                    
ByteBufferUtil.readWithShortLength(in));
 +                    }
 +                }
 +
 +                name = ByteBufferUtil.readWithShortLength(in);
 +                operator = Operator.readFrom(in);
 +                column = metadata.getColumnDefinition(name);
 +                if (!metadata.isCompactTable() && column == null)
 +                    throw new RuntimeException("Unknown (or dropped) column " 
+ UTF8Type.instance.getString(name) + " during deserialization");
 +
 +                if (version < MessagingService.VERSION_30)
 +                {
 +                    if (column == null)
 +                        kind = Kind.THRIFT_DYN_EXPR;
 +                    else if (column.type instanceof MapType && operator == 
Operator.EQ)
 +                        kind = Kind.MAP_EQUALITY;
 +                    else
 +                        kind = Kind.SIMPLE;
 +                }
 +
 +                assert kind != null;
 +                switch (kind)
 +                {
 +                    case SIMPLE:
 +                        return new SimpleExpression(column, operator, 
ByteBufferUtil.readWithShortLength(in));
 +                    case MAP_EQUALITY:
 +                        ByteBuffer key, value;
 +                        if (version < MessagingService.VERSION_30)
 +                        {
 +                            ByteBuffer composite = 
ByteBufferUtil.readWithShortLength(in);
 +                            key = CompositeType.extractComponent(composite, 
0);
 +                            value = CompositeType.extractComponent(composite, 
0);
 +                        }
 +                        else
 +                        {
 +                            key = ByteBufferUtil.readWithShortLength(in);
 +                            value = ByteBufferUtil.readWithShortLength(in);
 +                        }
 +                        return new MapEqualityExpression(column, key, 
operator, value);
 +                    case THRIFT_DYN_EXPR:
 +                        return new ThriftExpression(metadata, name, operator, 
ByteBufferUtil.readWithShortLength(in));
 +                }
 +                throw new AssertionError();
 +            }
 +
 +
 +            public long serializedSize(Expression expression, int version)
 +            {
 +                // version 3.0+ includes a byte for Kind
 +                long size = version >= MessagingService.VERSION_30 ? 1 : 0;
 +
 +                // custom expressions don't include a column or operator, all 
other expressions do
 +                if (expression.kind() != Kind.CUSTOM)
 +                    size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes)
 +                            + expression.operator.serializedSize();
 +
 +                switch (expression.kind())
 +                {
 +                    case SIMPLE:
 +                        size += 
ByteBufferUtil.serializedSizeWithShortLength(((SimpleExpression)expression).value);
 +                        break;
 +                    case MAP_EQUALITY:
 +                        MapEqualityExpression mexpr = 
(MapEqualityExpression)expression;
 +                        if (version < MessagingService.VERSION_30)
 +                            size += 
ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue());
 +                        else
 +                            size += 
ByteBufferUtil.serializedSizeWithShortLength(mexpr.key)
 +                                  + 
ByteBufferUtil.serializedSizeWithShortLength(mexpr.value);
 +                        break;
 +                    case THRIFT_DYN_EXPR:
 +                        size += 
ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value);
 +                        break;
 +                    case CUSTOM:
 +                        if (version >= MessagingService.VERSION_30)
 +                            size += 
IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex,
 version)
 +                                  + 
ByteBufferUtil.serializedSizeWithShortLength(expression.value);
 +                        break;
 +                }
 +                return size;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * An expression of the form 'column' 'op' 'value'.
 +     */
 +    private static class SimpleExpression extends Expression
 +    {
 +        public SimpleExpression(ColumnDefinition column, Operator operator, 
ByteBuffer value)
 +        {
 +            super(column, operator, value);
 +        }
 +
 +        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
 +        {
 +            // We support null conditions for LWT (in ColumnCondition) but 
not for RowFilter.
 +            // TODO: we should try to merge both code someday.
 +            assert value != null;
 +
 +            if (row.isStatic() != column.isStatic())
 +                return true;
 +
 +            switch (operator)
 +            {
 +                case EQ:
 +                case LT:
 +                case LTE:
 +                case GTE:
 +                case GT:
 +                    {
 +                        assert !column.isComplex() : "Only CONTAINS and 
CONTAINS_KEY are supported for 'complex' types";
 +
 +                        // In order to support operators on Counter types, 
their value has to be extracted from internal
 +                        // representation. See CASSANDRA-11629
 +                        if (column.type.isCounter())
 +                        {
 +                            ByteBuffer foundValue = getValue(metadata, 
partitionKey, row);
 +                            if (foundValue == null)
 +                                return false;
 +
 +                            ByteBuffer counterValue = 
LongType.instance.decompose(CounterContext.instance().total(foundValue));
 +                            return operator.isSatisfiedBy(LongType.instance, 
counterValue, value);
 +                        }
 +                        else
 +                        {
 +                            // Note that CQL expression are always of the 
form 'x < 4', i.e. the tested value is on the left.
 +                            ByteBuffer foundValue = getValue(metadata, 
partitionKey, row);
 +                            return foundValue != null && 
operator.isSatisfiedBy(column.type, foundValue, value);
 +                        }
 +                    }
 +                case NEQ:
 +                    {
 +                        assert !column.isComplex() : "Only CONTAINS and 
CONTAINS_KEY are supported for 'complex' types";
 +                        ByteBuffer foundValue = getValue(metadata, 
partitionKey, row);
 +                        // Note that CQL expression are always of the form 'x 
< 4', i.e. the tested value is on the left.
 +                        return foundValue != null && 
operator.isSatisfiedBy(column.type, foundValue, value);
 +                    }
 +                case CONTAINS:
 +                    assert column.type.isCollection();
 +                    CollectionType<?> type = (CollectionType<?>)column.type;
 +                    if (column.isComplex())
 +                    {
 +                        ComplexColumnData complexData = 
row.getComplexColumnData(column);
 +                        if (complexData != null)
 +                        {
 +                            for (Cell cell : complexData)
 +                            {
 +                                if (type.kind == CollectionType.Kind.SET)
 +                                {
 +                                    if 
(type.nameComparator().compare(cell.path().get(0), value) == 0)
 +                                        return true;
 +                                }
 +                                else
 +                                {
 +                                    if 
(type.valueComparator().compare(cell.value(), value) == 0)
 +                                        return true;
 +                                }
 +                            }
 +                        }
 +                        return false;
 +                    }
 +                    else
 +                    {
 +                        ByteBuffer foundValue = getValue(metadata, 
partitionKey, row);
 +                        if (foundValue == null)
 +                            return false;
 +
 +                        switch (type.kind)
 +                        {
 +                            case LIST:
 +                                ListType<?> listType = (ListType<?>)type;
 +                                return 
listType.compose(foundValue).contains(listType.getElementsType().compose(value));
 +                            case SET:
 +                                SetType<?> setType = (SetType<?>)type;
 +                                return 
setType.compose(foundValue).contains(setType.getElementsType().compose(value));
 +                            case MAP:
 +                                MapType<?,?> mapType = (MapType<?, ?>)type;
 +                                return 
mapType.compose(foundValue).containsValue(mapType.getValuesType().compose(value));
 +                        }
 +                        throw new AssertionError();
 +                    }
 +                case CONTAINS_KEY:
 +                    assert column.type.isCollection() && column.type 
instanceof MapType;
 +                    MapType<?, ?> mapType = (MapType<?, ?>)column.type;
 +                    if (column.isComplex())
 +                    {
 +                         return row.getCell(column, CellPath.create(value)) 
!= null;
 +                    }
 +                    else
 +                    {
 +                        ByteBuffer foundValue = getValue(metadata, 
partitionKey, row);
 +                        return foundValue != null && 
mapType.getSerializer().getSerializedValue(foundValue, value, 
mapType.getKeysType()) != null;
 +                    }
 +
 +                case IN:
 +                    // It wouldn't be terribly hard to support this (though 
doing so would imply supporting
 +                    // IN for 2ndary index) but currently we don't.
 +                    throw new AssertionError();
 +            }
 +            throw new AssertionError();
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            AbstractType<?> type = column.type;
 +            switch (operator)
 +            {
 +                case CONTAINS:
 +                    assert type instanceof CollectionType;
 +                    CollectionType<?> ct = (CollectionType<?>)type;
 +                    type = ct.kind == CollectionType.Kind.SET ? 
ct.nameComparator() : ct.valueComparator();
 +                    break;
 +                case CONTAINS_KEY:
 +                    assert type instanceof MapType;
 +                    type = ((MapType<?, ?>)type).nameComparator();
 +                    break;
 +                case IN:
 +                    type = ListType.getInstance(type, false);
 +                    break;
 +                default:
 +                    break;
 +            }
 +            return String.format("%s %s %s", column.name, operator, 
type.getString(value));
 +        }
 +
 +        @Override
 +        Kind kind()
 +        {
 +            return Kind.SIMPLE;
 +        }
 +    }
 +
 +    /**
 +     * An expression of the form 'column' ['key'] = 'value' (which is only
 +     * supported when 'column' is a map).
 +     */
 +    private static class MapEqualityExpression extends Expression
 +    {
 +        private final ByteBuffer key;
 +
 +        public MapEqualityExpression(ColumnDefinition column, ByteBuffer key, 
Operator operator, ByteBuffer value)
 +        {
 +            super(column, operator, value);
 +            assert column.type instanceof MapType && operator == Operator.EQ;
 +            this.key = key;
 +        }
 +
 +        @Override
 +        public void validate() throws InvalidRequestException
 +        {
 +            checkNotNull(key, "Unsupported null map key for column %s", 
column.name);
 +            checkBindValueSet(key, "Unsupported unset map key for column %s", 
column.name);
 +            checkNotNull(value, "Unsupported null map value for column %s", 
column.name);
 +            checkBindValueSet(value, "Unsupported unset map value for column 
%s", column.name);
 +        }
 +
 +        @Override
 +        public ByteBuffer getIndexValue()
 +        {
 +            return CompositeType.build(key, value);
 +        }
 +
 +        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
 +        {
 +            assert key != null;
 +            // We support null conditions for LWT (in ColumnCondition) but 
not for RowFilter.
 +            // TODO: we should try to merge both code someday.
 +            assert value != null;
 +
 +            if (row.isStatic() != column.isStatic())
 +                return true;
 +
 +            MapType<?, ?> mt = (MapType<?, ?>)column.type;
 +            if (column.isComplex())
 +            {
 +                Cell cell = row.getCell(column, CellPath.create(key));
 +                return cell != null && 
mt.valueComparator().compare(cell.value(), value) == 0;
 +            }
 +            else
 +            {
 +                ByteBuffer serializedMap = getValue(metadata, partitionKey, 
row);
 +                if (serializedMap == null)
 +                    return false;
 +
 +                ByteBuffer foundValue = 
mt.getSerializer().getSerializedValue(serializedMap, key, mt.getKeysType());
 +                return foundValue != null && 
mt.valueComparator().compare(foundValue, value) == 0;
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            MapType<?, ?> mt = (MapType<?, ?>)column.type;
 +            return String.format("%s[%s] = %s", column.name, 
mt.nameComparator().getString(key), mt.valueComparator().getString(value));
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if (this == o)
 +                return true;
 +
 +            if (!(o instanceof MapEqualityExpression))
 +                return false;
 +
 +            MapEqualityExpression that = (MapEqualityExpression)o;
 +
 +            return Objects.equal(this.column.name, that.column.name)
 +                && Objects.equal(this.operator, that.operator)
 +                && Objects.equal(this.key, that.key)
 +                && Objects.equal(this.value, that.value);
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            return Objects.hashCode(column.name, operator, key, value);
 +        }
 +
 +        @Override
 +        Kind kind()
 +        {
 +            return Kind.MAP_EQUALITY;
 +        }
 +    }
 +
 +    /**
 +     * An expression of the form 'name' = 'value', but where 'name' is 
actually the
 +     * clustering value for a compact table. This is only for thrift.
 +     */
 +    private static class ThriftExpression extends Expression
 +    {
 +        public ThriftExpression(CFMetaData metadata, ByteBuffer name, 
Operator operator, ByteBuffer value)
 +        {
 +            super(makeDefinition(metadata, name), operator, value);
 +            assert metadata.isCompactTable();
 +        }
 +
 +        private static ColumnDefinition makeDefinition(CFMetaData metadata, 
ByteBuffer name)
 +        {
 +            ColumnDefinition def = metadata.getColumnDefinition(name);
 +            if (def != null)
 +                return def;
 +
 +            // In thrift, we actually allow expression on non-defined columns 
for the sake of filtering. To accomodate
 +            // this we create a "fake" definition. This is messy but it works 
so is probably good enough.
 +            return ColumnDefinition.regularDef(metadata, name, 
metadata.compactValueColumn().type);
 +        }
 +
 +        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
 +        {
 +            assert value != null;
 +
 +            // On thrift queries, even if the column expression is a "static" 
one, we'll have convert it as a "dynamic"
 +            // one in ThriftResultsMerger, so we always expect it to be a 
dynamic one. Further, we expect this is only
 +            // called when the row clustering does match the column (see 
ThriftFilter above).
 +            assert row.clustering().equals(makeCompactClustering(metadata, 
column.name.bytes));
 +            Cell cell = row.getCell(metadata.compactValueColumn());
 +            return cell != null && operator.isSatisfiedBy(column.type, 
cell.value(), value);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("%s %s %s", column.name, operator, 
column.type.getString(value));
 +        }
 +
 +        @Override
 +        Kind kind()
 +        {
 +            return Kind.THRIFT_DYN_EXPR;
 +        }
 +    }
 +
 +    /**
 +     * A custom index expression for use with 2i implementations which 
support custom syntax and which are not
 +     * necessarily linked to a single column in the base table.
 +     */
 +    public static final class CustomExpression extends Expression
 +    {
 +        private final IndexMetadata targetIndex;
 +        private final CFMetaData cfm;
 +
 +        public CustomExpression(CFMetaData cfm, IndexMetadata targetIndex, 
ByteBuffer value)
 +        {
 +            // The operator is not relevant, but Expression requires it so 
for now we just hardcode EQ
 +            super(makeDefinition(cfm, targetIndex), Operator.EQ, value);
 +            this.targetIndex = targetIndex;
 +            this.cfm = cfm;
 +        }
 +
 +        private static ColumnDefinition makeDefinition(CFMetaData cfm, 
IndexMetadata index)
 +        {
 +            // Similarly to how we handle non-defined columns in thift, we 
create a fake column definition to
 +            // represent the target index. This is definitely something that 
can be improved though.
 +            return ColumnDefinition.regularDef(cfm, 
ByteBuffer.wrap(index.name.getBytes()), BytesType.instance);
 +        }
 +
 +        public IndexMetadata getTargetIndex()
 +        {
 +            return targetIndex;
 +        }
 +
 +        public ByteBuffer getValue()
 +        {
 +            return value;
 +        }
 +
 +        public String toString()
 +        {
 +            return String.format("expr(%s, %s)",
 +                                 targetIndex.name,
 +                                 Keyspace.openAndGetStore(cfm)
 +                                         .indexManager
 +                                         .getIndex(targetIndex)
 +                                         .customExpressionValueType());
 +        }
 +
 +        Kind kind()
 +        {
 +            return Kind.CUSTOM;
 +        }
 +
 +        // Filtering by custom expressions isn't supported yet, so just 
accept any row
 +        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
 +        {
 +            return true;
 +        }
 +    }
 +
 +    public static class Serializer
 +    {
 +        public void serialize(RowFilter filter, DataOutputPlus out, int 
version) throws IOException
 +        {
 +            out.writeBoolean(filter instanceof ThriftFilter);
 +            out.writeUnsignedVInt(filter.expressions.size());
 +            for (Expression expr : filter.expressions)
 +                Expression.serializer.serialize(expr, out, version);
 +
 +        }
 +
 +        public RowFilter deserialize(DataInputPlus in, int version, 
CFMetaData metadata) throws IOException
 +        {
 +            boolean forThrift = in.readBoolean();
 +            int size = (int)in.readUnsignedVInt();
 +            List<Expression> expressions = new ArrayList<>(size);
 +            for (int i = 0; i < size; i++)
 +                expressions.add(Expression.serializer.deserialize(in, 
version, metadata));
 +
 +            return forThrift
 +                 ? new ThriftFilter(expressions)
 +                 : new CQLFilter(expressions);
 +        }
 +
 +        public long serializedSize(RowFilter filter, int version)
 +        {
 +            long size = 1 // forThrift
 +                      + 
TypeSizes.sizeofUnsignedVInt(filter.expressions.size());
 +            for (Expression expr : filter.expressions)
 +                size += Expression.serializer.serializedSize(expr, version);
 +            return size;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CacheService.java
index c51a5d1,a13a52d..d23bdb0
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@@ -439,13 -414,10 +439,13 @@@ public class CacheService implements Ca
              {
                  public Pair<RowCacheKey, IRowCacheEntry> call() throws 
Exception
                  {
 -                    DecoratedKey key = cfs.partitioner.decorateKey(buffer);
 -                    QueryFilter cacheFilter = new QueryFilter(key, 
cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE);
 -                    ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, 
Integer.MIN_VALUE);
 -                    return Pair.create(new 
RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data);
 +                    DecoratedKey key = cfs.decorateKey(buffer);
 +                    int nowInSec = FBUtilities.nowInSeconds();
 +                    try (OpOrder.Group op = cfs.readOrdering.start(); 
UnfilteredRowIterator iter = 
SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, 
key).queryMemtableAndDisk(cfs, op))
 +                    {
-                         CachedPartition toCache = 
CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, 
nowInSec), nowInSec);
++                        CachedPartition toCache = 
CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, 
nowInSec, true), nowInSec);
 +                        return Pair.create(new 
RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache);
 +                    }
                  }
              });
          }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to