http://git-wip-us.apache.org/repos/asf/cassandra/blob/71778eec/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 9fbe462,0000000..56a10a8
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@@ -1,521 -1,0 +1,558 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
++import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.AbstractMarker;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.Term;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
++import org.apache.cassandra.db.composites.CompositesBuilder;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.db.marshal.CompositeType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +public abstract class SingleColumnRestriction extends AbstractRestriction
 +{
 +    /**
 +     * The definition of the column to which apply the restriction.
 +     */
 +    protected final ColumnDefinition columnDef;
 +
 +    public SingleColumnRestriction(ColumnDefinition columnDef)
 +    {
 +        this.columnDef = columnDef;
 +    }
 +
-     /**
-      * Returns the definition of the column to which is associated this 
restriction.
-      * @return the definition of the column to which is associated this 
restriction
-      */
-     public ColumnDefinition getColumnDef()
++    @Override
++    public Collection<ColumnDefinition> getColumnDefs()
 +    {
-         return columnDef;
++        return Collections.singletonList(columnDef);
 +    }
 +
 +    @Override
-     public void addIndexExpressionTo(List<IndexExpression> expressions,
-                                      SecondaryIndexManager indexManager,
-                                      QueryOptions options) throws 
InvalidRequestException
++    public ColumnDefinition getFirstColumn()
 +    {
-         List<ByteBuffer> values = values(options);
-         checkTrue(values.size() == 1, "IN restrictions are not supported on 
indexed columns");
++        return columnDef;
++    }
 +
-         ByteBuffer value = validateIndexedValue(columnDef, values.get(0));
-         expressions.add(new IndexExpression(columnDef.name.bytes, 
Operator.EQ, value));
++    @Override
++    public ColumnDefinition getLastColumn()
++    {
++        return columnDef;
 +    }
 +
 +    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
 +    {
 +        SecondaryIndex index = 
indexManager.getIndexForColumn(columnDef.name.bytes);
 +        return index != null && isSupportedBy(index);
 +    }
 +
++    @Override
++    public final Restriction mergeWith(Restriction otherRestriction) throws 
InvalidRequestException
++    {
++            checkFalse(otherRestriction.isMultiColumn(),
++                       "Mixing single column relations and multi column 
relations on clustering columns is not allowed");
++
++            return doMergeWith(otherRestriction);
++    }
++
++    protected abstract Restriction doMergeWith(Restriction otherRestriction) 
throws InvalidRequestException;
++
 +    /**
 +     * Check if this type of restriction is supported by the specified index.
 +     *
 +     * @param index the Secondary index
 +     * @return <code>true</code> this type of restriction is supported by the 
specified index,
 +     * <code>false</code> otherwise.
 +     */
 +    protected abstract boolean isSupportedBy(SecondaryIndex index);
 +
 +    public static final class EQ extends SingleColumnRestriction
 +    {
 +        private final Term value;
 +
 +        public EQ(ColumnDefinition columnDef, Term value)
 +        {
 +            super(columnDef);
 +            this.value = value;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(value, ksName, functionName);
 +        }
 +
 +        @Override
 +        public boolean isEQ()
 +        {
 +            return true;
 +        }
 +
 +        @Override
-         public List<ByteBuffer> values(QueryOptions options) throws 
InvalidRequestException
++        public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                         SecondaryIndexManager indexManager,
++                                         QueryOptions options) throws 
InvalidRequestException
++        {
++            ByteBuffer buffer = validateIndexedValue(columnDef, 
value.bindAndGet(options));
++            expressions.add(new IndexExpression(columnDef.name.bytes, 
Operator.EQ, buffer));
++        }
++
++        @Override
++        public CompositesBuilder appendTo(CompositesBuilder builder, 
QueryOptions options)
 +        {
-             return Collections.singletonList(value.bindAndGet(options));
++            builder.addElementToAll(value.bindAndGet(options));
++            checkFalse(builder.containsNull(), "Invalid null value in 
condition for column %s", columnDef);
++            return builder;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("EQ(%s)", value);
 +        }
 +
 +        @Override
-         public Restriction mergeWith(Restriction otherRestriction) throws 
InvalidRequestException
++        public Restriction doMergeWith(Restriction otherRestriction) throws 
InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one 
relation if it includes an Equal", columnDef.name);
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return index.supportsOperator(Operator.EQ);
 +        }
 +    }
 +
 +    public static abstract class IN extends SingleColumnRestriction
 +    {
 +        public IN(ColumnDefinition columnDef)
 +        {
 +            super(columnDef);
 +        }
 +
 +        @Override
 +        public final boolean isIN()
 +        {
 +            return true;
 +        }
 +
 +        @Override
-         public final Restriction mergeWith(Restriction otherRestriction) 
throws InvalidRequestException
++        public final Restriction doMergeWith(Restriction otherRestriction) 
throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one 
relation if it includes a IN", columnDef.name);
 +        }
 +
 +        @Override
++        public CompositesBuilder appendTo(CompositesBuilder builder, 
QueryOptions options)
++        {
++            builder.addEachElementToAll(getValues(options));
++            checkFalse(builder.containsNull(), "Invalid null value in 
condition for column %s", columnDef);
++            return builder;
++        }
++
++        @Override
++        public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                         SecondaryIndexManager indexManager,
++                                         QueryOptions options) throws 
InvalidRequestException
++        {
++            List<ByteBuffer> values = getValues(options);
++            checkTrue(values.size() == 1, "IN restrictions are not supported 
on indexed columns");
++
++            ByteBuffer value = validateIndexedValue(columnDef, values.get(0));
++            expressions.add(new IndexExpression(columnDef.name.bytes, 
Operator.EQ, value));
++        }
++
++        @Override
 +        protected final boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return index.supportsOperator(Operator.IN);
 +        }
++
++        protected abstract List<ByteBuffer> getValues(QueryOptions options) 
throws InvalidRequestException;
 +    }
 +
 +    public static class InWithValues extends IN
 +    {
 +        protected final List<Term> values;
 +
 +        public InWithValues(ColumnDefinition columnDef, List<Term> values)
 +        {
 +            super(columnDef);
 +            this.values = values;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(values, ksName, functionName);
 +        }
 +
 +        @Override
-         public List<ByteBuffer> values(QueryOptions options) throws 
InvalidRequestException
++        protected List<ByteBuffer> getValues(QueryOptions options) throws 
InvalidRequestException
 +        {
 +            List<ByteBuffer> buffers = new ArrayList<>(values.size());
 +            for (Term value : values)
 +                buffers.add(value.bindAndGet(options));
 +            return buffers;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("IN(%s)", values);
 +        }
 +    }
 +
 +    public static class InWithMarker extends IN
 +    {
 +        protected final AbstractMarker marker;
 +
 +        public InWithMarker(ColumnDefinition columnDef, AbstractMarker marker)
 +        {
 +            super(columnDef);
 +            this.marker = marker;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return false;
 +        }
 +
 +        @Override
-         public List<ByteBuffer> values(QueryOptions options) throws 
InvalidRequestException
++        protected List<ByteBuffer> getValues(QueryOptions options) throws 
InvalidRequestException
 +        {
 +            Term.MultiItemTerminal lval = (Term.MultiItemTerminal) 
marker.bind(options);
 +            if (lval == null)
 +                throw new InvalidRequestException("Invalid null value for IN 
restriction");
 +            return lval.getElements();
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "IN ?";
 +        }
 +    }
 +
 +    public static class Slice extends SingleColumnRestriction
 +    {
 +        private final TermSlice slice;
 +
 +        public Slice(ColumnDefinition columnDef, Bound bound, boolean 
inclusive, Term term)
 +        {
 +            super(columnDef);
 +            slice = TermSlice.newInstance(bound, inclusive, term);
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return (slice.hasBound(Bound.START) && 
usesFunction(slice.bound(Bound.START), ksName, functionName))
 +                    || (slice.hasBound(Bound.END) && 
usesFunction(slice.bound(Bound.END), ksName, functionName));
 +        }
 +
 +        @Override
 +        public boolean isSlice()
 +        {
 +            return true;
 +        }
 +
 +        @Override
-         public List<ByteBuffer> values(QueryOptions options) throws 
InvalidRequestException
++        public CompositesBuilder appendTo(CompositesBuilder builder, 
QueryOptions options)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            return slice.hasBound(b);
 +        }
 +
 +        @Override
-         public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws 
InvalidRequestException
++        public CompositesBuilder appendBoundTo(CompositesBuilder builder, 
Bound bound, QueryOptions options)
 +        {
-             return 
Collections.singletonList(slice.bound(b).bindAndGet(options));
++            return 
builder.addElementToAll(slice.bound(bound).bindAndGet(options));
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            return slice.isInclusive(b);
 +        }
 +
 +        @Override
-         public Restriction mergeWith(Restriction otherRestriction)
-         throws InvalidRequestException
++        public Restriction doMergeWith(Restriction otherRestriction) throws 
InvalidRequestException
 +        {
 +            checkTrue(otherRestriction.isSlice(),
 +                      "Column \"%s\" cannot be restricted by both an equality 
and an inequality relation",
 +                      columnDef.name);
 +
 +            SingleColumnRestriction.Slice otherSlice = 
(SingleColumnRestriction.Slice) otherRestriction;
 +
 +            checkFalse(hasBound(Bound.START) && 
otherSlice.hasBound(Bound.START),
 +                       "More than one restriction was found for the start 
bound on %s", columnDef.name);
 +
 +            checkFalse(hasBound(Bound.END) && otherSlice.hasBound(Bound.END),
 +                       "More than one restriction was found for the end bound 
on %s", columnDef.name);
 +
 +            return new Slice(columnDef,  slice.merge(otherSlice.slice));
 +        }
 +
 +        @Override
 +        public void addIndexExpressionTo(List<IndexExpression> expressions,
 +                                         SecondaryIndexManager indexManager,
 +                                         QueryOptions options) throws 
InvalidRequestException
 +        {
 +            for (Bound b : Bound.values())
 +            {
 +                if (hasBound(b))
 +                {
 +                    ByteBuffer value = validateIndexedValue(columnDef, 
slice.bound(b).bindAndGet(options));
 +                    Operator op = slice.getIndexOperator(b);
 +                    // If the underlying comparator for name is reversed, we 
need to reverse the IndexOperator: user operation
 +                    // always refer to the "forward" sorting even if the 
clustering order is reversed, but the 2ndary code does
 +                    // use the underlying comparator as is.
 +                    op = columnDef.isReversedType() ? op.reverse() : op;
 +                    expressions.add(new IndexExpression(columnDef.name.bytes, 
op, value));
 +                }
 +            }
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return slice.isSupportedBy(index);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("SLICE%s", slice);
 +        }
 +
 +        private Slice(ColumnDefinition columnDef, TermSlice slice)
 +        {
 +            super(columnDef);
 +            this.slice = slice;
 +        }
 +    }
 +
 +    // This holds CONTAINS, CONTAINS_KEY, and map[key] = value restrictions 
because we might want to have any combination of them.
 +    public static final class Contains extends SingleColumnRestriction
 +    {
 +        private List<Term> values = new ArrayList<>(); // for CONTAINS
 +        private List<Term> keys = new ArrayList<>(); // for CONTAINS_KEY
 +        private List<Term> entryKeys = new ArrayList<>(); // for map[key] = 
value
 +        private List<Term> entryValues = new ArrayList<>(); // for map[key] = 
value
 +
 +        public Contains(ColumnDefinition columnDef, Term t, boolean isKey)
 +        {
 +            super(columnDef);
 +            if (isKey)
 +                keys.add(t);
 +            else
 +                values.add(t);
 +        }
 +
 +        public Contains(ColumnDefinition columnDef, Term mapKey, Term 
mapValue)
 +        {
 +            super(columnDef);
 +            entryKeys.add(mapKey);
 +            entryValues.add(mapValue);
 +        }
 +
 +        @Override
-         public List<ByteBuffer> values(QueryOptions options) throws 
InvalidRequestException
++        public CompositesBuilder appendTo(CompositesBuilder builder, 
QueryOptions options)
 +        {
-             return bindAndGet(values, options);
++            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean isContains()
 +        {
 +            return true;
 +        }
 +
 +        @Override
-         public Restriction mergeWith(Restriction otherRestriction) throws 
InvalidRequestException
++        public Restriction doMergeWith(Restriction otherRestriction) throws 
InvalidRequestException
 +        {
 +            checkTrue(otherRestriction.isContains(),
 +                      "Collection column %s can only be restricted by 
CONTAINS, CONTAINS KEY, or map-entry equality",
-                       getColumnDef().name);
++                      columnDef.name);
 +
-             SingleColumnRestriction.Contains newContains = new 
Contains(getColumnDef());
++            SingleColumnRestriction.Contains newContains = new 
Contains(columnDef);
 +
 +            copyKeysAndValues(this, newContains);
 +            copyKeysAndValues((Contains) otherRestriction, newContains);
 +
 +            return newContains;
 +        }
 +
 +        @Override
 +        public void addIndexExpressionTo(List<IndexExpression> expressions,
 +                                         SecondaryIndexManager indexManager,
 +                                         QueryOptions options)
 +                                         throws InvalidRequestException
 +        {
-             addExpressionsFor(expressions, values(options), 
Operator.CONTAINS);
-             addExpressionsFor(expressions, keys(options), 
Operator.CONTAINS_KEY);
++            addExpressionsFor(expressions, bindAndGet(values, options), 
Operator.CONTAINS);
++            addExpressionsFor(expressions, bindAndGet(keys, options), 
Operator.CONTAINS_KEY);
 +            addExpressionsFor(expressions, entries(options), Operator.EQ);
 +        }
 +
 +        private void addExpressionsFor(List<IndexExpression> target, 
List<ByteBuffer> values,
 +                                       Operator op) throws 
InvalidRequestException
 +        {
 +            for (ByteBuffer value : values)
 +            {
 +                validateIndexedValue(columnDef, value);
 +                target.add(new IndexExpression(columnDef.name.bytes, op, 
value));
 +            }
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            boolean supported = false;
 +
 +            if (numberOfValues() > 0)
 +                supported |= index.supportsOperator(Operator.CONTAINS);
 +
 +            if (numberOfKeys() > 0)
 +                supported |= index.supportsOperator(Operator.CONTAINS_KEY);
 +
 +            if (numberOfEntries() > 0)
 +                supported |= index.supportsOperator(Operator.EQ);
 +
 +            return supported;
 +        }
 +
 +        public int numberOfValues()
 +        {
 +            return values.size();
 +        }
 +
 +        public int numberOfKeys()
 +        {
 +            return keys.size();
 +        }
 +
 +        public int numberOfEntries()
 +        {
 +            return entryKeys.size();
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(values, ksName, functionName) || 
usesFunction(keys, ksName, functionName) ||
 +                   usesFunction(entryKeys, ksName, functionName) || 
usesFunction(entryValues, ksName, functionName);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("CONTAINS(values=%s, keys=%s, entryKeys=%s, 
entryValues=%s)", values, keys, entryKeys, entryValues);
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
-         public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws 
InvalidRequestException
++        public CompositesBuilder appendBoundTo(CompositesBuilder builder, 
Bound bound, QueryOptions options)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
-         private List<ByteBuffer> keys(QueryOptions options) throws 
InvalidRequestException
-         {
-             return bindAndGet(keys, options);
-         }
- 
 +        private List<ByteBuffer> entries(QueryOptions options) throws 
InvalidRequestException
 +        {
 +            List<ByteBuffer> entryBuffers = new ArrayList<>(entryKeys.size());
 +            List<ByteBuffer> keyBuffers = bindAndGet(entryKeys, options);
 +            List<ByteBuffer> valueBuffers = bindAndGet(entryValues, options);
 +            for (int i = 0; i < entryKeys.size(); i++)
 +            {
 +                if (valueBuffers.get(i) == null)
 +                    throw new InvalidRequestException("Unsupported null value 
for map-entry equality");
 +                entryBuffers.add(CompositeType.build(keyBuffers.get(i), 
valueBuffers.get(i)));
 +            }
 +            return entryBuffers;
 +        }
 +
 +        /**
 +         * Binds the query options to the specified terms and returns the 
resulting values.
 +         *
 +         * @param terms the terms
 +         * @param options the query options
 +         * @return the value resulting from binding the query options to the 
specified terms
 +         * @throws InvalidRequestException if a problem occurs while binding 
the query options
 +         */
 +        private static List<ByteBuffer> bindAndGet(List<Term> terms, 
QueryOptions options) throws InvalidRequestException
 +        {
 +            List<ByteBuffer> buffers = new ArrayList<>(terms.size());
 +            for (Term value : terms)
 +                buffers.add(value.bindAndGet(options));
 +            return buffers;
 +        }
 +
 +        /**
 +         * Copies the keys and value from the first <code>Contains</code> to 
the second one.
 +         *
 +         * @param from the <code>Contains</code> to copy from
 +         * @param to the <code>Contains</code> to copy to
 +         */
 +        private static void copyKeysAndValues(Contains from, Contains to)
 +        {
 +            to.values.addAll(from.values);
 +            to.keys.addAll(from.keys);
 +            to.entryKeys.addAll(from.entryKeys);
 +            to.entryValues.addAll(from.entryValues);
 +        }
 +
 +        private Contains(ColumnDefinition columnDef)
 +        {
 +            super(columnDef);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/71778eec/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 403bf6d,0000000..cea1699
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@@ -1,576 -1,0 +1,576 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.base.Joiner;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.Relation;
 +import org.apache.cassandra.cql3.VariableSpecifications;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +/**
 + * The restrictions corresponding to the relations specified on the 
where-clause of CQL query.
 + */
 +public final class StatementRestrictions
 +{
 +    /**
 +     * The Column Family meta data
 +     */
 +    public final CFMetaData cfm;
 +
 +    /**
 +     * Restrictions on partitioning columns
 +     */
 +    private PrimaryKeyRestrictions partitionKeyRestrictions;
 +
 +    /**
 +     * Restrictions on clustering columns
 +     */
 +    private PrimaryKeyRestrictions clusteringColumnsRestrictions;
 +
 +    /**
 +     * Restriction on non-primary key columns (i.e. secondary index 
restrictions)
 +     */
-     private SingleColumnRestrictions nonPrimaryKeyRestrictions;
++    private RestrictionSet nonPrimaryKeyRestrictions;
 +
 +    /**
 +     * The restrictions used to build the index expressions
 +     */
 +    private final List<Restrictions> indexRestrictions = new ArrayList<>();
 +
 +    /**
 +     * <code>true</code> if the secondary index need to be queried, 
<code>false</code> otherwise
 +     */
 +    private boolean usesSecondaryIndexing;
 +
 +    /**
 +     * Specify if the query will return a range of partition keys.
 +     */
 +    private boolean isKeyRange;
 +
 +    /**
 +     * Creates a new empty <code>StatementRestrictions</code>.
 +     *
 +     * @param cfm the column family meta data
 +     * @return a new empty <code>StatementRestrictions</code>.
 +     */
 +    public static StatementRestrictions empty(CFMetaData cfm)
 +    {
 +        return new StatementRestrictions(cfm);
 +    }
 +
 +    private StatementRestrictions(CFMetaData cfm)
 +    {
 +        this.cfm = cfm;
-         this.partitionKeyRestrictions = new 
SingleColumnPrimaryKeyRestrictions(cfm.getKeyValidatorAsCType());
-         this.clusteringColumnsRestrictions = new 
SingleColumnPrimaryKeyRestrictions(cfm.comparator);
-         this.nonPrimaryKeyRestrictions = new SingleColumnRestrictions();
++        this.partitionKeyRestrictions = new 
PrimaryKeyRestrictionSet(cfm.getKeyValidatorAsCType());
++        this.clusteringColumnsRestrictions = new 
PrimaryKeyRestrictionSet(cfm.comparator);
++        this.nonPrimaryKeyRestrictions = new RestrictionSet();
 +    }
 +
 +    public StatementRestrictions(CFMetaData cfm,
 +            List<Relation> whereClause,
 +            VariableSpecifications boundNames,
 +            boolean selectsOnlyStaticColumns,
 +            boolean selectACollection) throws InvalidRequestException
 +    {
 +        this.cfm = cfm;
-         this.partitionKeyRestrictions = new 
SingleColumnPrimaryKeyRestrictions(cfm.getKeyValidatorAsCType());
-         this.clusteringColumnsRestrictions = new 
SingleColumnPrimaryKeyRestrictions(cfm.comparator);
-         this.nonPrimaryKeyRestrictions = new SingleColumnRestrictions();
++        this.partitionKeyRestrictions = new 
PrimaryKeyRestrictionSet(cfm.getKeyValidatorAsCType());
++        this.clusteringColumnsRestrictions = new 
PrimaryKeyRestrictionSet(cfm.comparator);
++        this.nonPrimaryKeyRestrictions = new RestrictionSet();
 +
 +        /*
 +         * WHERE clause. For a given entity, rules are: - EQ relation 
conflicts with anything else (including a 2nd EQ)
 +         * - Can't have more than one LT(E) relation (resp. GT(E) relation) - 
IN relation are restricted to row keys
 +         * (for now) and conflicts with anything else (we could allow two IN 
for the same entity but that doesn't seem
 +         * very useful) - The value_alias cannot be restricted in any way (we 
don't support wide rows with indexed value
 +         * in CQL so far)
 +         */
 +        for (Relation relation : whereClause)
 +            addRestriction(relation.toRestriction(cfm, boundNames));
 +
 +        ColumnFamilyStore cfs = 
Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
 +        SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
 +
 +        boolean hasQueriableClusteringColumnIndex = 
clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager);
 +        boolean hasQueriableIndex = hasQueriableClusteringColumnIndex
 +                || 
partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager)
 +                || 
nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager);
 +
 +        // At this point, the select statement if fully constructed, but we 
still have a few things to validate
 +        processPartitionKeyRestrictions(hasQueriableIndex);
 +
 +        // Some but not all of the partition key columns have been specified;
 +        // hence we need turn these restrictions into index expressions.
 +        if (usesSecondaryIndexing)
 +            indexRestrictions.add(partitionKeyRestrictions);
 +
 +        checkFalse(selectsOnlyStaticColumns && 
hasClusteringColumnsRestriction(),
 +                   "Cannot restrict clustering columns when selecting only 
static columns");
 +
 +        processClusteringColumnsRestrictions(hasQueriableIndex, 
selectACollection);
 +
 +        // Covers indexes on the first clustering column (among others).
 +        if (isKeyRange && hasQueriableClusteringColumnIndex)
 +            usesSecondaryIndexing = true;
 +
 +        usesSecondaryIndexing = usesSecondaryIndexing || 
clusteringColumnsRestrictions.isContains();
 +
 +        if (usesSecondaryIndexing)
 +            indexRestrictions.add(clusteringColumnsRestrictions);
 +
 +        // Even if usesSecondaryIndexing is false at this point, we'll still 
have to use one if
 +        // there is restrictions not covered by the PK.
 +        if (!nonPrimaryKeyRestrictions.isEmpty())
 +        {
 +            usesSecondaryIndexing = true;
 +            indexRestrictions.add(nonPrimaryKeyRestrictions);
 +        }
 +
 +        if (usesSecondaryIndexing)
 +            validateSecondaryIndexSelections(selectsOnlyStaticColumns);
 +    }
 +
 +    private void addRestriction(Restriction restriction) throws 
InvalidRequestException
 +    {
 +        if (restriction.isMultiColumn())
 +            clusteringColumnsRestrictions = 
clusteringColumnsRestrictions.mergeWith(restriction);
 +        else if (restriction.isOnToken())
 +            partitionKeyRestrictions = 
partitionKeyRestrictions.mergeWith(restriction);
 +        else
 +            addSingleColumnRestriction((SingleColumnRestriction) restriction);
 +    }
 +
 +    public boolean usesFunction(String ksName, String functionName)
 +    {
 +        return  partitionKeyRestrictions.usesFunction(ksName, functionName)
 +                || clusteringColumnsRestrictions.usesFunction(ksName, 
functionName)
 +                || nonPrimaryKeyRestrictions.usesFunction(ksName, 
functionName);
 +    }
 +
 +    private void addSingleColumnRestriction(SingleColumnRestriction 
restriction) throws InvalidRequestException
 +    {
-         ColumnDefinition def = restriction.getColumnDef();
++        ColumnDefinition def = restriction.columnDef;
 +        if (def.isPartitionKey())
 +            partitionKeyRestrictions = 
partitionKeyRestrictions.mergeWith(restriction);
 +        else if (def.isClusteringColumn())
 +            clusteringColumnsRestrictions = 
clusteringColumnsRestrictions.mergeWith(restriction);
 +        else
 +            nonPrimaryKeyRestrictions = 
nonPrimaryKeyRestrictions.addRestriction(restriction);
 +    }
 +
 +    /**
 +     * Checks if the restrictions on the partition key is an IN restriction.
 +     *
 +     * @return <code>true</code> the restrictions on the partition key is an 
IN restriction, <code>false</code>
 +     * otherwise.
 +     */
 +    public boolean keyIsInRelation()
 +    {
 +        return partitionKeyRestrictions.isIN();
 +    }
 +
 +    /**
 +     * Checks if the query request a range of partition keys.
 +     *
 +     * @return <code>true</code> if the query request a range of partition 
keys, <code>false</code> otherwise.
 +     */
 +    public boolean isKeyRange()
 +    {
 +        return this.isKeyRange;
 +    }
 +
 +    /**
 +     * Checks if the secondary index need to be queried.
 +     *
 +     * @return <code>true</code> if the secondary index need to be queried, 
<code>false</code> otherwise.
 +     */
 +    public boolean usesSecondaryIndexing()
 +    {
 +        return this.usesSecondaryIndexing;
 +    }
 +
 +    private void processPartitionKeyRestrictions(boolean hasQueriableIndex) 
throws InvalidRequestException
 +    {
 +        // If there is a queriable index, no special condition are required 
on the other restrictions.
 +        // But we still need to know 2 things:
 +        // - If we don't have a queriable index, is the query ok
 +        // - Is it queriable without 2ndary index, which is always more 
efficient
 +        // If a component of the partition key is restricted by a relation, 
all preceding
 +        // components must have a EQ. Only the last partition key component 
can be in IN relation.
 +        if (partitionKeyRestrictions.isOnToken())
 +            isKeyRange = true;
 +
 +        if (hasPartitionKeyUnrestrictedComponents())
 +        {
 +            if (!partitionKeyRestrictions.isEmpty())
 +            {
 +                if (!hasQueriableIndex)
 +                    throw invalidRequest("Partition key parts: %s must be 
restricted as other parts are",
 +                                         Joiner.on(", 
").join(getPartitionKeyUnrestrictedComponents()));
 +            }
 +
 +            isKeyRange = true;
 +            usesSecondaryIndexing = hasQueriableIndex;
 +        }
 +    }
 +
 +    /**
 +     * Checks if the partition key has some unrestricted components.
 +     * @return <code>true</code> if the partition key has some unrestricted 
components, <code>false</code> otherwise.
 +     */
 +    private boolean hasPartitionKeyUnrestrictedComponents()
 +    {
 +        return partitionKeyRestrictions.size() <  
cfm.partitionKeyColumns().size();
 +    }
 +
 +    /**
 +     * Returns the partition key components that are not restricted.
 +     * @return the partition key components that are not restricted.
 +     */
 +    private List<ColumnIdentifier> getPartitionKeyUnrestrictedComponents()
 +    {
 +        List<ColumnDefinition> list = new 
ArrayList<>(cfm.partitionKeyColumns());
 +        list.removeAll(partitionKeyRestrictions.getColumnDefs());
 +        return ColumnDefinition.toIdentifiers(list);
 +    }
 +
 +    /**
 +     * Processes the clustering column restrictions.
 +     *
 +     * @param hasQueriableIndex <code>true</code> if some of the queried data 
are indexed, <code>false</code> otherwise
 +     * @param selectACollection <code>true</code> if the query should return 
a collection column
 +     * @throws InvalidRequestException if the request is invalid
 +     */
 +    private void processClusteringColumnsRestrictions(boolean 
hasQueriableIndex,
 +                                                      boolean 
selectACollection) throws InvalidRequestException
 +    {
 +        checkFalse(clusteringColumnsRestrictions.isIN() && selectACollection,
 +                   "Cannot restrict clustering columns by IN relations when a 
collection is selected by the query");
 +        checkFalse(clusteringColumnsRestrictions.isContains() && 
!hasQueriableIndex,
 +                   "Cannot restrict clustering columns by a CONTAINS relation 
without a secondary index");
 +
 +        if (hasClusteringColumnsRestriction())
 +        {
 +            List<ColumnDefinition> clusteringColumns = 
cfm.clusteringColumns();
 +            List<ColumnDefinition> restrictedColumns = new 
LinkedList<>(clusteringColumnsRestrictions.getColumnDefs());
 +
 +            for (int i = 0, m = restrictedColumns.size(); i < m; i++)
 +            {
 +                ColumnDefinition clusteringColumn = clusteringColumns.get(i);
 +                ColumnDefinition restrictedColumn = restrictedColumns.get(i);
 +
 +                if (!clusteringColumn.equals(restrictedColumn))
 +                {
 +                    checkTrue(hasQueriableIndex,
 +                              "PRIMARY KEY column \"%s\" cannot be restricted 
as preceding column \"%s\" is not restricted",
 +                              restrictedColumn.name,
 +                              clusteringColumn.name);
 +
 +                    usesSecondaryIndexing = true; // handle gaps and 
non-keyrange cases.
 +                    break;
 +                }
 +            }
 +        }
 +
 +        if (clusteringColumnsRestrictions.isContains())
 +            usesSecondaryIndexing = true;
 +    }
 +
 +    public List<IndexExpression> getIndexExpressions(SecondaryIndexManager 
indexManager,
 +                                                     QueryOptions options) 
throws InvalidRequestException
 +    {
 +        if (!usesSecondaryIndexing || indexRestrictions.isEmpty())
 +            return Collections.emptyList();
 +
 +        List<IndexExpression> expressions = new ArrayList<>();
 +        for (Restrictions restrictions : indexRestrictions)
 +            restrictions.addIndexExpressionTo(expressions, indexManager, 
options);
 +
 +        return expressions;
 +    }
 +
 +    /**
 +     * Returns the partition keys for which the data is requested.
 +     *
 +     * @param options the query options
 +     * @return the partition keys for which the data is requested.
 +     * @throws InvalidRequestException if the partition keys cannot be 
retrieved
 +     */
 +    public Collection<ByteBuffer> getPartitionKeys(final QueryOptions 
options) throws InvalidRequestException
 +    {
 +        return partitionKeyRestrictions.values(options);
 +    }
 +
 +    /**
 +     * Returns the specified bound of the partition key.
 +     *
 +     * @param b the boundary type
 +     * @param options the query options
 +     * @return the specified bound of the partition key
 +     * @throws InvalidRequestException if the boundary cannot be retrieved
 +     */
 +    private ByteBuffer getPartitionKeyBound(Bound b, QueryOptions options) 
throws InvalidRequestException
 +    {
 +        // Deal with unrestricted partition key components (special-casing is 
required to deal with 2i queries on the
 +        // first
 +        // component of a composite partition key).
 +        if (hasPartitionKeyUnrestrictedComponents())
 +            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +
 +        // We deal with IN queries for keys in other places, so we know 
buildBound will return only one result
 +        return partitionKeyRestrictions.bounds(b, options).get(0);
 +    }
 +
 +    /**
 +     * Returns the partition key bounds.
 +     *
 +     * @param options the query options
 +     * @return the partition key bounds
 +     * @throws InvalidRequestException if the query is invalid
 +     */
 +    public AbstractBounds<RowPosition> getPartitionKeyBounds(QueryOptions 
options) throws InvalidRequestException
 +    {
 +        IPartitioner p = StorageService.getPartitioner();
 +
 +        if (partitionKeyRestrictions.isOnToken())
 +        {
 +            return getPartitionKeyBoundsForTokenRestrictions(p, options);
 +        }
 +
 +        return getPartitionKeyBounds(p, options);
 +    }
 +
 +    private AbstractBounds<RowPosition> getPartitionKeyBounds(IPartitioner p,
 +                                                              QueryOptions 
options) throws InvalidRequestException
 +    {
 +        ByteBuffer startKeyBytes = getPartitionKeyBound(Bound.START, options);
 +        ByteBuffer finishKeyBytes = getPartitionKeyBound(Bound.END, options);
 +
 +        RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
 +        RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
 +
 +        if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum())
 +            return null;
 +
 +        if (partitionKeyRestrictions.isInclusive(Bound.START))
 +        {
 +            return partitionKeyRestrictions.isInclusive(Bound.END)
 +                    ? new Bounds<>(startKey, finishKey)
 +                    : new IncludingExcludingBounds<>(startKey, finishKey);
 +        }
 +
 +        return partitionKeyRestrictions.isInclusive(Bound.END)
 +                ? new Range<>(startKey, finishKey)
 +                : new ExcludingBounds<>(startKey, finishKey);
 +    }
 +
 +    private AbstractBounds<RowPosition> 
getPartitionKeyBoundsForTokenRestrictions(IPartitioner p,
 +                                                                              
    QueryOptions options)
 +                                                                              
            throws InvalidRequestException
 +    {
 +        Token startToken = getTokenBound(Bound.START, options, p);
 +        Token endToken = getTokenBound(Bound.END, options, p);
 +
 +        boolean includeStart = 
partitionKeyRestrictions.isInclusive(Bound.START);
 +        boolean includeEnd = partitionKeyRestrictions.isInclusive(Bound.END);
 +
 +        /*
 +         * If we ask SP.getRangeSlice() for (token(200), token(200)], it will 
happily return the whole ring.
 +         * However, wrapping range doesn't really make sense for CQL, and we 
want to return an empty result in that
 +         * case (CASSANDRA-5573). So special case to create a range that is 
guaranteed to be empty.
 +         *
 +         * In practice, we want to return an empty result set if either 
startToken > endToken, or both are equal but
 +         * one of the bound is excluded (since [a, a] can contains something, 
but not (a, a], [a, a) or (a, a)).
 +         * Note though that in the case where startToken or endToken is the 
minimum token, then this special case
 +         * rule should not apply.
 +         */
 +        int cmp = startToken.compareTo(endToken);
 +        if (!startToken.isMinimum() && !endToken.isMinimum()
 +                && (cmp > 0 || (cmp == 0 && (!includeStart || !includeEnd))))
 +            return null;
 +
 +        RowPosition start = includeStart ? startToken.minKeyBound() : 
startToken.maxKeyBound();
 +        RowPosition end = includeEnd ? endToken.maxKeyBound() : 
endToken.minKeyBound();
 +
 +        return new Range<>(start, end);
 +    }
 +
 +    private Token getTokenBound(Bound b, QueryOptions options, IPartitioner 
p) throws InvalidRequestException
 +    {
 +        if (!partitionKeyRestrictions.hasBound(b))
 +            return p.getMinimumToken();
 +
 +        ByteBuffer value = partitionKeyRestrictions.bounds(b, options).get(0);
 +        checkNotNull(value, "Invalid null token value");
 +        return p.getTokenFactory().fromByteArray(value);
 +    }
 +
 +    /**
 +     * Checks if the query does not contains any restriction on the 
clustering columns.
 +     *
 +     * @return <code>true</code> if the query does not contains any 
restriction on the clustering columns,
 +     * <code>false</code> otherwise.
 +     */
 +    public boolean hasNoClusteringColumnsRestriction()
 +    {
 +        return clusteringColumnsRestrictions.isEmpty();
 +    }
 +
 +    // For non-composite slices, we don't support internally the difference 
between exclusive and
 +    // inclusive bounds, so we deal with it manually.
 +    public boolean isNonCompositeSliceWithExclusiveBounds()
 +    {
 +        return !cfm.comparator.isCompound()
 +                && clusteringColumnsRestrictions.isSlice()
 +                && (!clusteringColumnsRestrictions.isInclusive(Bound.START) 
|| !clusteringColumnsRestrictions.isInclusive(Bound.END));
 +    }
 +
 +    /**
 +     * Returns the requested clustering columns as <code>Composite</code>s.
 +     *
 +     * @param options the query options
 +     * @return the requested clustering columns as <code>Composite</code>s
 +     * @throws InvalidRequestException if the query is not valid
 +     */
 +    public List<Composite> getClusteringColumnsAsComposites(QueryOptions 
options) throws InvalidRequestException
 +    {
 +        return clusteringColumnsRestrictions.valuesAsComposites(options);
 +    }
 +
 +    /**
 +     * Returns the bounds (start or end) of the clustering columns as 
<code>Composites</code>.
 +     *
 +     * @param b the bound type
 +     * @param options the query options
 +     * @return the bounds (start or end) of the clustering columns as 
<code>Composites</code>
 +     * @throws InvalidRequestException if the request is not valid
 +     */
 +    public List<Composite> getClusteringColumnsBoundsAsComposites(Bound b,
 +                                                                  
QueryOptions options) throws InvalidRequestException
 +    {
 +        return clusteringColumnsRestrictions.boundsAsComposites(b, options);
 +    }
 +
 +    /**
 +     * Returns the bounds (start or end) of the clustering columns.
 +     *
 +     * @param b the bound type
 +     * @param options the query options
 +     * @return the bounds (start or end) of the clustering columns
 +     * @throws InvalidRequestException if the request is not valid
 +     */
 +    public List<ByteBuffer> getClusteringColumnsBounds(Bound b, QueryOptions 
options) throws InvalidRequestException
 +    {
 +        return clusteringColumnsRestrictions.bounds(b, options);
 +    }
 +
 +    /**
 +     * Checks if the bounds (start or end) of the clustering columns are 
inclusive.
 +     *
 +     * @param bound the bound type
 +     * @return <code>true</code> if the bounds (start or end) of the 
clustering columns are inclusive,
 +     * <code>false</code> otherwise
 +     */
 +    public boolean areRequestedBoundsInclusive(Bound bound)
 +    {
 +        return clusteringColumnsRestrictions.isInclusive(bound);
 +    }
 +
 +    /**
 +     * Checks if the query returns a range of columns.
 +     *
 +     * @return <code>true</code> if the query returns a range of columns, 
<code>false</code> otherwise.
 +     */
 +    public boolean isColumnRange()
 +    {
 +        // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not 
dense, composite).
 +        // Static CF (non dense but non composite) never entails a column 
slice however
 +        if (!cfm.comparator.isDense())
 +            return cfm.comparator.isCompound();
 +
 +        // Otherwise (i.e. for compact table where we don't have a row marker 
anyway and thus don't care about
 +        // CASSANDRA-5762),
 +        // it is a range query if it has at least one the column alias for 
which no relation is defined or is not EQ.
 +        return clusteringColumnsRestrictions.size() < 
cfm.clusteringColumns().size() || clusteringColumnsRestrictions.isSlice();
 +    }
 +
 +    /**
 +     * Checks if the query need to use filtering.
 +     * @return <code>true</code> if the query need to use filtering, 
<code>false</code> otherwise.
 +     */
 +    public boolean needFiltering()
 +    {
 +        int numberOfRestrictedColumns = 0;
 +        for (Restrictions restrictions : indexRestrictions)
 +            numberOfRestrictedColumns += restrictions.size();
 +
 +        return numberOfRestrictedColumns > 1
 +                || (numberOfRestrictedColumns == 0 && 
!clusteringColumnsRestrictions.isEmpty())
 +                || (numberOfRestrictedColumns != 0
 +                        && nonPrimaryKeyRestrictions.hasMultipleContains());
 +    }
 +
 +    private void validateSecondaryIndexSelections(boolean 
selectsOnlyStaticColumns) throws InvalidRequestException
 +    {
 +        checkFalse(keyIsInRelation(),
 +                   "Select on indexed columns and with IN clause for the 
PRIMARY KEY are not supported");
 +        // When the user only select static columns, the intent is that we 
don't query the whole partition but just
 +        // the static parts. But 1) we don't have an easy way to do that with 
2i and 2) since we don't support index on
 +        // static columns
 +        // so far, 2i means that you've restricted a non static column, so 
the query is somewhat non-sensical.
 +        checkFalse(selectsOnlyStaticColumns, "Queries using 2ndary indexes 
don't support selecting only static columns");
 +    }
 +
 +    /**
 +     * Checks if the query has some restrictions on the clustering columns.
 +     *
 +     * @return <code>true</code> if the query has some restrictions on the 
clustering columns,
 +     * <code>false</code> otherwise.
 +     */
 +    private boolean hasClusteringColumnsRestriction()
 +    {
 +        return !clusteringColumnsRestrictions.isEmpty();
 +    }
 +
 +    public void reverse()
 +    {
 +        clusteringColumnsRestrictions = new 
ReversedPrimaryKeyRestrictions(clusteringColumnsRestrictions);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/71778eec/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index cbcec07,0000000..5848c91
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@@ -1,255 -1,0 +1,274 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import com.google.common.base.Joiner;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.Term;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.composites.CType;
 +import org.apache.cassandra.db.composites.Composite;
++import org.apache.cassandra.db.composites.CompositesBuilder;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +/**
 + * <code>Restriction</code> using the token function.
 + */
 +public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
 +{
 +    /**
 +     * The definition of the columns to which apply the token restriction.
 +     */
 +    protected final List<ColumnDefinition> columnDefs;
 +
 +    /**
 +     * Creates a new <code>TokenRestriction</code> that apply to the 
specified columns.
 +     *
 +     * @param ctype the composite type
 +     * @param columnDefs the definition of the columns to which apply the 
token restriction
 +     */
 +    public TokenRestriction(CType ctype, List<ColumnDefinition> columnDefs)
 +    {
 +        super(ctype);
 +        this.columnDefs = columnDefs;
 +    }
 +
 +    @Override
 +    public  boolean isOnToken()
 +    {
 +        return true;
 +    }
 +
 +    @Override
 +    public Collection<ColumnDefinition> getColumnDefs()
 +    {
 +        return columnDefs;
 +    }
 +
 +    @Override
++    public ColumnDefinition getFirstColumn()
++    {
++        return columnDefs.get(0);
++    }
++
++    @Override
++    public ColumnDefinition getLastColumn()
++    {
++        return columnDefs.get(columnDefs.size() - 1);
++    }
++
++    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager 
secondaryIndexManager)
 +    {
 +        return false;
 +    }
 +
 +    @Override
 +    public final void addIndexExpressionTo(List<IndexExpression> expressions,
 +                                     SecondaryIndexManager indexManager,
 +                                     QueryOptions options)
 +    {
 +        throw new UnsupportedOperationException("Index expression cannot be 
created for token restriction");
 +    }
 +
 +    @Override
++    public CompositesBuilder appendTo(CompositesBuilder builder, QueryOptions 
options)
++    {
++        throw new UnsupportedOperationException();
++    }
++
++    @Override
 +    public List<Composite> valuesAsComposites(QueryOptions options) throws 
InvalidRequestException
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public List<Composite> boundsAsComposites(Bound bound, QueryOptions 
options) throws InvalidRequestException
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    /**
 +     * Returns the column names as a comma separated <code>String</code>.
 +     *
 +     * @return the column names as a comma separated <code>String</code>.
 +     */
 +    protected final String getColumnNamesAsString()
 +    {
 +        return Joiner.on(", 
").join(ColumnDefinition.toIdentifiers(columnDefs));
 +    }
 +
 +    @Override
 +    public final PrimaryKeyRestrictions mergeWith(Restriction 
otherRestriction) throws InvalidRequestException
 +    {
 +        if (!otherRestriction.isOnToken())
 +            return new TokenFilter(toPrimaryKeyRestriction(otherRestriction), 
this);
 +
 +        return doMergeWith((TokenRestriction) otherRestriction);
 +    }
 +
 +    /**
 +     * Merges this restriction with the specified 
<code>TokenRestriction</code>.
 +     * @param otherRestriction the <code>TokenRestriction</code> to merge 
with.
 +     */
 +    protected abstract PrimaryKeyRestrictions doMergeWith(TokenRestriction 
otherRestriction) throws InvalidRequestException;
 +
 +    /**
 +     * Converts the specified restriction into a 
<code>PrimaryKeyRestrictions</code>.
 +     *
 +     * @param restriction the restriction to convert
 +     * @return a <code>PrimaryKeyRestrictions</code>
 +     * @throws InvalidRequestException if a problem occurs while converting 
the restriction
 +     */
 +    private PrimaryKeyRestrictions toPrimaryKeyRestriction(Restriction 
restriction) throws InvalidRequestException
 +    {
 +        if (restriction instanceof PrimaryKeyRestrictions)
 +            return (PrimaryKeyRestrictions) restriction;
 +
-         return new 
SingleColumnPrimaryKeyRestrictions(ctype).mergeWith(restriction);
++        return new PrimaryKeyRestrictionSet(ctype).mergeWith(restriction);
 +    }
 +
 +    public static final class EQ extends TokenRestriction
 +    {
 +        private final Term value;
 +
 +        public EQ(CType ctype, List<ColumnDefinition> columnDefs, Term value)
 +        {
 +            super(ctype, columnDefs);
 +            this.value = value;
 +        }
 +
 +        @Override
 +        public boolean isEQ()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(value, ksName, functionName);
 +        }
 +
 +        @Override
 +        protected PrimaryKeyRestrictions doMergeWith(TokenRestriction 
otherRestriction) throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one 
relation if it includes an Equal",
 +                                 Joiner.on(", 
").join(ColumnDefinition.toIdentifiers(columnDefs)));
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> values(QueryOptions options) throws 
InvalidRequestException
 +        {
 +            return Collections.singletonList(value.bindAndGet(options));
 +        }
 +    }
 +
 +    public static class Slice extends TokenRestriction
 +    {
 +        private final TermSlice slice;
 +
 +        public Slice(CType ctype, List<ColumnDefinition> columnDefs, Bound 
bound, boolean inclusive, Term term)
 +        {
 +            super(ctype, columnDefs);
 +            slice = TermSlice.newInstance(bound, inclusive, term);
 +        }
 +
 +        @Override
 +        public boolean isSlice()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> values(QueryOptions options) throws 
InvalidRequestException
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            return slice.hasBound(b);
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws 
InvalidRequestException
 +        {
 +            return 
Collections.singletonList(slice.bound(b).bindAndGet(options));
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return (slice.hasBound(Bound.START) && 
usesFunction(slice.bound(Bound.START), ksName, functionName))
 +                    || (slice.hasBound(Bound.END) && 
usesFunction(slice.bound(Bound.END), ksName, functionName));
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            return slice.isInclusive(b);
 +        }
 +
 +        @Override
 +        protected PrimaryKeyRestrictions doMergeWith(TokenRestriction 
otherRestriction)
 +        throws InvalidRequestException
 +        {
 +            if (!otherRestriction.isSlice())
 +                throw invalidRequest("Columns \"%s\" cannot be restricted by 
both an equality and an inequality relation",
 +                                     getColumnNamesAsString());
 +
 +            TokenRestriction.Slice otherSlice = (TokenRestriction.Slice) 
otherRestriction;
 +
 +            if (hasBound(Bound.START) && otherSlice.hasBound(Bound.START))
 +                throw invalidRequest("More than one restriction was found for 
the start bound on %s",
 +                                     getColumnNamesAsString());
 +
 +            if (hasBound(Bound.END) && otherSlice.hasBound(Bound.END))
 +                throw invalidRequest("More than one restriction was found for 
the end bound on %s",
 +                                     getColumnNamesAsString());
 +
 +            return new Slice(ctype, columnDefs,  
slice.merge(otherSlice.slice));
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("SLICE%s", slice);
 +        }
 +
 +        private Slice(CType ctype, List<ColumnDefinition> columnDefs, 
TermSlice slice)
 +        {
 +            super(ctype, columnDefs);
 +            this.slice = slice;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/71778eec/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 8347ef5,60558b4..683ed49
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -22,17 -22,14 +22,19 @@@ import java.util.*
  
  import com.google.common.base.Function;
  import com.google.common.collect.Iterables;
++import com.google.common.collect.Lists;
  
  import org.apache.cassandra.auth.Permission;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.restrictions.Restriction;
 +import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction;
 +import org.apache.cassandra.cql3.selection.Selection;
  import org.apache.cassandra.db.*;
--import org.apache.cassandra.db.composites.CBuilder;
  import org.apache.cassandra.db.composites.Composite;
++import org.apache.cassandra.db.composites.Composites;
++import org.apache.cassandra.db.composites.CompositesBuilder;
  import org.apache.cassandra.db.filter.ColumnSlice;
  import org.apache.cassandra.db.filter.SliceQueryFilter;
  import org.apache.cassandra.db.marshal.BooleanType;
@@@ -44,6 -41,6 +46,12 @@@ import org.apache.cassandra.thrift.Thri
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.Pair;
  
++import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
++
++import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
++
++import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
++
  /*
   * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and 
DELETE.
   */
@@@ -287,38 -287,38 +295,23 @@@ public abstract class ModificationState
      public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
      throws InvalidRequestException
      {
--        CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
--        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
++        CompositesBuilder keyBuilder = new 
CompositesBuilder(cfm.getKeyValidatorAsCType());
          for (ColumnDefinition def : cfm.partitionKeyColumns())
          {
--            Restriction r = processedKeys.get(def.name);
--            if (r == null)
--                throw new InvalidRequestException(String.format("Missing 
mandatory PRIMARY KEY part %s", def.name));
--
--            List<ByteBuffer> values = r.values(options);
++            Restriction r = checkNotNull(processedKeys.get(def.name), 
"Missing mandatory PRIMARY KEY part %s", def.name);
++            r.appendTo(keyBuilder, options);
++        }
  
--            if (keyBuilder.remainingCount() == 1)
--            {
--                for (ByteBuffer val : values)
--                {
--                    if (val == null)
--                        throw new 
InvalidRequestException(String.format("Invalid null value for partition key 
part %s", def.name));
--                    ByteBuffer key = keyBuilder.buildWith(val).toByteBuffer();
--                    ThriftValidation.validateKey(cfm, key);
--                    keys.add(key);
--                }
--            }
--            else
++        return Lists.transform(keyBuilder.build(), new Function<Composite, 
ByteBuffer>()
++        {
++            @Override
++            public ByteBuffer apply(Composite composite)
              {
--                if (values.size() != 1)
--                    throw new InvalidRequestException("IN is only supported 
on the last column of the partition key");
--                ByteBuffer val = values.get(0);
--                if (val == null)
--                    throw new InvalidRequestException(String.format("Invalid 
null value for partition key part %s", def.name));
--                keyBuilder.add(val);
++                ByteBuffer byteBuffer = composite.toByteBuffer();
++                ThriftValidation.validateKey(cfm, byteBuffer);
++                return byteBuffer;
              }
--        }
--        return keys;
++        });
      }
  
      public Composite createClusteringPrefix(QueryOptions options)
@@@ -359,7 -359,7 +352,7 @@@
      private Composite createClusteringPrefixBuilderInternal(QueryOptions 
options)
      throws InvalidRequestException
      {
--        CBuilder builder = cfm.comparator.prefixBuilder();
++        CompositesBuilder builder = new CompositesBuilder(cfm.comparator);
          ColumnDefinition firstEmptyKey = null;
          for (ColumnDefinition def : cfm.clusteringColumns())
          {
@@@ -367,24 -367,24 +360,19 @@@
              if (r == null)
              {
                  firstEmptyKey = def;
--                if (requireFullClusteringKey() && !cfm.comparator.isDense() 
&& cfm.comparator.isCompound())
--                    throw new InvalidRequestException(String.format("Missing 
mandatory PRIMARY KEY part %s", def.name));
++                checkFalse(requireFullClusteringKey() && 
!cfm.comparator.isDense() && cfm.comparator.isCompound(), 
++                           "Missing mandatory PRIMARY KEY part %s", def.name);
              }
              else if (firstEmptyKey != null)
              {
--                throw new InvalidRequestException(String.format("Missing 
PRIMARY KEY part %s since %s is set", firstEmptyKey.name, def.name));
++                throw invalidRequest("Missing PRIMARY KEY part %s since %s is 
set", firstEmptyKey.name, def.name);
              }
              else
              {
--                List<ByteBuffer> values = r.values(options);
--                assert values.size() == 1; // We only allow IN for row keys 
so far
--                ByteBuffer val = values.get(0);
--                if (val == null)
--                    throw new InvalidRequestException(String.format("Invalid 
null value for clustering key part %s", def.name));
--                builder.add(val);
++                r.appendTo(builder, options);
              }
          }
--        return builder.build();
++        return builder.build().get(0); // We only allow IN for row keys so far
      }
  
      protected ColumnDefinition getFirstEmptyKey()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/71778eec/src/java/org/apache/cassandra/db/composites/CompositesBuilder.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/composites/CompositesBuilder.java
index 9a32dcc,0000000..48bc802
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/composites/CompositesBuilder.java
+++ b/src/java/org/apache/cassandra/db/composites/CompositesBuilder.java
@@@ -1,271 -1,0 +1,292 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.composites;
 +
 +import java.nio.ByteBuffer;
- import java.util.ArrayList;
- import java.util.Comparator;
- import java.util.LinkedHashSet;
- import java.util.List;
- import java.util.Set;
- import java.util.TreeSet;
++import java.util.*;
 +
 +import org.apache.cassandra.db.composites.Composite.EOC;
 +
 +import static java.util.Collections.singletonList;
 +
 +/**
 + * Builder that allow to build multiple composites at the same time.
 + */
 +public final class CompositesBuilder
 +{
 +    /**
-      * The builder used to build the <code>Composite</code>s.
++     * The composite type.
 +     */
-     private final CBuilder builder;
- 
-     /**
-      * The comparator used to sort the returned <code>Composite</code>s.
-      */
-     private final Comparator<Composite> comparator;
++    private final CType ctype;
 +
 +    /**
 +     * The elements of the composites
 +     */
 +    private final List<List<ByteBuffer>> elementsList = new ArrayList<>();
 +
 +    /**
-      * The number of elements that still can be added.
++     * The number of elements that have been added.
 +     */
-     private int remaining;
++    private int size;
 +
 +    /**
 +     * <code>true</code> if the composites have been build, 
<code>false</code> otherwise.
 +     */
 +    private boolean built;
 +
 +    /**
 +     * <code>true</code> if the composites contains some <code>null</code> 
elements.
 +     */
 +    private boolean containsNull;
 +
-     public CompositesBuilder(CBuilder builder, Comparator<Composite> 
comparator)
++    /**
++     * <code>true</code> if some empty collection have been added.
++     */
++    private boolean hasMissingElements;
++
++    public CompositesBuilder(CType ctype)
 +    {
-         this.builder = builder;
-         this.comparator = comparator;
-         this.remaining = builder.remainingCount();
++        this.ctype = ctype;
 +    }
 +
 +    /**
 +     * Adds the specified element to all the composites.
 +     * <p>
 +     * If this builder contains 2 composites: A-B and A-C a call to this 
method to add D will result in the composites:
 +     * A-B-D and A-C-D.
 +     * </p>
 +     *
 +     * @param value the value of the next element
 +     * @return this <code>CompositeBuilder</code>
 +     */
 +    public CompositesBuilder addElementToAll(ByteBuffer value)
 +    {
 +        checkUpdateable();
 +
 +        if (isEmpty())
 +            elementsList.add(new ArrayList<ByteBuffer>());
 +
 +        for (int i = 0, m = elementsList.size(); i < m; i++)
 +        {
 +            if (value == null)
 +                containsNull = true;
 +
 +            elementsList.get(i).add(value);
 +        }
-         remaining--;
++        size++;
 +        return this;
 +    }
 +
 +    /**
 +     * Adds individually each of the specified elements to the end of all of 
the existing composites.
 +     * <p>
 +     * If this builder contains 2 composites: A-B and A-C a call to this 
method to add D and E will result in the 4
 +     * composites: A-B-D, A-B-E, A-C-D and A-C-E.
 +     * </p>
 +     *
 +     * @param values the elements to add
 +     * @return this <code>CompositeBuilder</code>
 +     */
 +    public CompositesBuilder addEachElementToAll(List<ByteBuffer> values)
 +    {
 +        checkUpdateable();
 +
 +        if (isEmpty())
 +            elementsList.add(new ArrayList<ByteBuffer>());
 +
-         for (int i = 0, m = elementsList.size(); i < m; i++)
++        if (values.isEmpty())
 +        {
-             List<ByteBuffer> oldComposite = elementsList.remove(0);
- 
-             for (int j = 0, n = values.size(); j < n; j++)
++            hasMissingElements = true;
++        }
++        else
++        {
++            for (int i = 0, m = elementsList.size(); i < m; i++)
 +            {
-                 List<ByteBuffer> newComposite = new ArrayList<>(oldComposite);
-                 elementsList.add(newComposite);
++                List<ByteBuffer> oldComposite = elementsList.remove(0);
 +
-                 ByteBuffer value = values.get(j);
++                for (int j = 0, n = values.size(); j < n; j++)
++                {
++                    List<ByteBuffer> newComposite = new 
ArrayList<>(oldComposite);
++                    elementsList.add(newComposite);
 +
-                 if (value == null)
-                     containsNull = true;
++                    ByteBuffer value = values.get(j);
 +
-                 newComposite.add(values.get(j));
++                    if (value == null)
++                        containsNull = true;
++
++                    newComposite.add(values.get(j));
++                }
 +            }
 +        }
- 
-         remaining--;
++        size++;
 +        return this;
 +    }
 +
 +
 +    /**
 +     * Adds individually each of the specified list of elements to the end of 
all of the existing composites.
 +     * <p>
 +     * If this builder contains 2 composites: A-B and A-C a call to this 
method to add [[D, E], [F, G]] will result in the 4
 +     * composites: A-B-D-E, A-B-F-G, A-C-D-E and A-C-F-G.
 +     * </p>
 +     *
 +     * @param values the elements to add
 +     * @return this <code>CompositeBuilder</code>
 +     */
 +    public CompositesBuilder addAllElementsToAll(List<List<ByteBuffer>> 
values)
 +    {
-         assert !values.isEmpty();
 +        checkUpdateable();
 +
 +        if (isEmpty())
 +            elementsList.add(new ArrayList<ByteBuffer>());
 +
-         for (int i = 0, m = elementsList.size(); i < m; i++)
++        if (values.isEmpty())
 +        {
-             List<ByteBuffer> oldComposite = elementsList.remove(0);
- 
-             for (int j = 0, n = values.size(); j < n; j++)
++            hasMissingElements = true;
++        }
++        else
++        {
++            for (int i = 0, m = elementsList.size(); i < m; i++)
 +            {
-                 List<ByteBuffer> newComposite = new ArrayList<>(oldComposite);
-                 elementsList.add(newComposite);
++                List<ByteBuffer> oldComposite = elementsList.remove(0);
++
++                for (int j = 0, n = values.size(); j < n; j++)
++                {
++                    List<ByteBuffer> newComposite = new 
ArrayList<>(oldComposite);
++                    elementsList.add(newComposite);
 +
-                 List<ByteBuffer> value = values.get(j);
++                    List<ByteBuffer> value = values.get(j);
 +
-                 if (value.contains(null))
-                     containsNull = true;
++                    if (value.isEmpty())
++                        hasMissingElements = true;
 +
-                 newComposite.addAll(value);
++                    if (value.contains(null))
++                        containsNull = true;
++
++                    newComposite.addAll(value);
++                }
 +            }
++            size += values.get(0).size();
 +        }
- 
-         remaining -= values.get(0).size();
 +        return this;
 +    }
 +
 +    /**
 +     * Returns the number of elements that can be added to the composites.
 +     *
 +     * @return the number of elements that can be added to the composites.
 +     */
 +    public int remainingCount()
 +    {
-         return remaining;
++        return ctype.size() - size;
 +    }
 +
 +    /**
 +     * Checks if some elements can still be added to the composites.
 +     *
 +     * @return <code>true</code> if it is possible to add more elements to 
the composites, <code>false</code> otherwise.
 +     */
 +    public boolean hasRemaining()
 +    {
-         return remaining > 0;
++        return remainingCount() > 0;
 +    }
 +
 +    /**
 +     * Checks if this builder is empty.
 +     *
 +     * @return <code>true</code> if this builder is empty, <code>false</code> 
otherwise.
 +     */
 +    public boolean isEmpty()
 +    {
 +        return elementsList.isEmpty();
 +    }
 +
 +    /**
 +     * Checks if the composites contains null elements.
 +     *
 +     * @return <code>true</code> if the composites contains <code>null</code> 
elements, <code>false</code> otherwise.
 +     */
 +    public boolean containsNull()
 +    {
 +        return containsNull;
 +    }
 +
 +    /**
++     * Checks if some empty list of values have been added
++     * @return <code>true</code> if the composites have some missing 
elements, <code>false</code> otherwise.
++     */
++    public boolean hasMissingElements()
++    {
++        return hasMissingElements;
++    }
++
++    /**
 +     * Builds the <code>Composites</code>.
 +     *
 +     * @return the composites
 +     */
 +    public List<Composite> build()
 +    {
 +        return buildWithEOC(EOC.NONE);
 +    }
 +
 +    /**
 +     * Builds the <code>Composites</code> with the specified EOC.
 +     *
 +     * @return the composites
 +     */
 +    public List<Composite> buildWithEOC(EOC eoc)
 +    {
 +        built = true;
 +
++        if (hasMissingElements)
++            return Collections.emptyList();
++
++        CBuilder builder = ctype.builder();
++
 +        if (elementsList.isEmpty())
 +            return singletonList(builder.build().withEOC(eoc));
 +
 +        // Use a Set to sort if needed and eliminate duplicates
 +        Set<Composite> set = newSet();
 +
 +        for (int i = 0, m = elementsList.size(); i < m; i++)
 +        {
 +            List<ByteBuffer> elements = elementsList.get(i);
 +            set.add(builder.buildWith(elements).withEOC(eoc));
 +        }
 +
 +        return new ArrayList<>(set);
 +    }
 +
 +    /**
 +     * Returns a new <code>Set</code> instance that will be used to eliminate 
duplicates and sort the results.
 +     *
 +     * @return a new <code>Set</code> instance.
 +     */
 +    private Set<Composite> newSet()
 +    {
-         return comparator == null ? new LinkedHashSet<Composite>() : new 
TreeSet<Composite>(comparator);
++        return new TreeSet<>(ctype);
 +    }
 +
 +    private void checkUpdateable()
 +    {
 +        if (!hasRemaining() || built)
 +            throw new IllegalStateException("this CompositesBuilder cannot be 
updated anymore");
 +    }
 +}

Reply via email to