http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/Tuples.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Tuples.java 
b/src/java/org/apache/cassandra/cql3/Tuples.java
index 19e20c2..ba9ddb6 100644
--- a/src/java/org/apache/cassandra/cql3/Tuples.java
+++ b/src/java/org/apache/cassandra/cql3/Tuples.java
@@ -136,6 +136,20 @@ public class Tuples
             }
         }
 
+        @Override
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            List<AbstractType<?>> types = new ArrayList<>(elements.size());
+            for (Term.Raw term : elements)
+            {
+                AbstractType<?> type = term.getExactTypeIfKnown(keyspace);
+                if (type == null)
+                    return null;
+                types.add(type);
+            }
+            return new TupleType(types);
+        }
+
         public String getText()
         {
             return 
elements.stream().map(Term.Raw::getText).collect(Collectors.joining(", ", "(", 
")"));
@@ -326,6 +340,11 @@ public class Tuples
             return new ColumnSpecification(receivers.get(0).ksName, 
receivers.get(0).cfName, identifier, type);
         }
 
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            return null;
+        }
+
         public AbstractMarker prepare(String keyspace, List<? extends 
ColumnSpecification> receivers) throws InvalidRequestException
         {
             return new Tuples.Marker(bindIndex, makeReceiver(receivers));
@@ -365,6 +384,11 @@ public class Tuples
             return new ColumnSpecification(receivers.get(0).ksName, 
receivers.get(0).cfName, identifier, ListType.getInstance(type, false));
         }
 
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            return null;
+        }
+
         public AbstractMarker prepare(String keyspace, List<? extends 
ColumnSpecification> receivers) throws InvalidRequestException
         {
             return new InMarker(bindIndex, makeInReceiver(receivers));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/TypeCast.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/TypeCast.java 
b/src/java/org/apache/cassandra/cql3/TypeCast.java
index 890b34f..7b2f306 100644
--- a/src/java/org/apache/cassandra/cql3/TypeCast.java
+++ b/src/java/org/apache/cassandra/cql3/TypeCast.java
@@ -58,6 +58,11 @@ public class TypeCast extends Term.Raw
             return AssignmentTestable.TestResult.NOT_ASSIGNABLE;
     }
 
+    public AbstractType<?> getExactTypeIfKnown(String keyspace)
+    {
+        return type.prepare(keyspace).getType();
+    }
+
     public String getText()
     {
         return "(" + type + ")" + term;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/UserTypes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java 
b/src/java/org/apache/cassandra/cql3/UserTypes.java
index 3a54216..41b8eed 100644
--- a/src/java/org/apache/cassandra/cql3/UserTypes.java
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -23,9 +23,7 @@ import java.util.*;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.marshal.TupleType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -44,15 +42,15 @@ public abstract class UserTypes
         UserType ut = (UserType)column.type;
         return new ColumnSpecification(column.ksName,
                                        column.cfName,
-                                       new ColumnIdentifier(column.name + "." 
+ UTF8Type.instance.compose(ut.fieldName(field)), true),
+                                       new ColumnIdentifier(column.name + "." 
+ ut.fieldName(field), true),
                                        ut.fieldType(field));
     }
 
     public static class Literal extends Term.Raw
     {
-        public final Map<ColumnIdentifier, Term.Raw> entries;
+        public final Map<FieldIdentifier, Term.Raw> entries;
 
-        public Literal(Map<ColumnIdentifier, Term.Raw> entries)
+        public Literal(Map<FieldIdentifier, Term.Raw> entries)
         {
             this.entries = entries;
         }
@@ -67,7 +65,7 @@ public abstract class UserTypes
             int foundValues = 0;
             for (int i = 0; i < ut.size(); i++)
             {
-                ColumnIdentifier field = new ColumnIdentifier(ut.fieldName(i), 
UTF8Type.instance);
+                FieldIdentifier field = ut.fieldName(i);
                 Term.Raw raw = entries.get(field);
                 if (raw == null)
                     raw = Constants.NULL_LITERAL;
@@ -83,9 +81,9 @@ public abstract class UserTypes
             if (foundValues != entries.size())
             {
                 // We had some field that are not part of the type
-                for (ColumnIdentifier id : entries.keySet())
+                for (FieldIdentifier id : entries.keySet())
                 {
-                    if (!ut.fieldNames().contains(id.bytes))
+                    if (!ut.fieldNames().contains(id))
                         throw new 
InvalidRequestException(String.format("Unknown field '%s' in value of user 
defined type %s", id, ut.getNameAsString()));
                 }
             }
@@ -102,7 +100,7 @@ public abstract class UserTypes
             UserType ut = (UserType)receiver.type;
             for (int i = 0; i < ut.size(); i++)
             {
-                ColumnIdentifier field = new ColumnIdentifier(ut.fieldName(i), 
UTF8Type.instance);
+                FieldIdentifier field = ut.fieldName(i);
                 Term.Raw value = entries.get(field);
                 if (value == null)
                     continue;
@@ -129,14 +127,19 @@ public abstract class UserTypes
             }
         }
 
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            return null;
+        }
+
         public String getText()
         {
             StringBuilder sb = new StringBuilder();
             sb.append("{");
-            Iterator<Map.Entry<ColumnIdentifier, Term.Raw>> iter = 
entries.entrySet().iterator();
+            Iterator<Map.Entry<FieldIdentifier, Term.Raw>> iter = 
entries.entrySet().iterator();
             while (iter.hasNext())
             {
-                Map.Entry<ColumnIdentifier, Term.Raw> entry = iter.next();
+                Map.Entry<FieldIdentifier, Term.Raw> entry = iter.next();
                 sb.append(entry.getKey()).append(": 
").append(entry.getValue().getText());
                 if (iter.hasNext())
                     sb.append(", ");
@@ -294,10 +297,11 @@ public abstract class UserTypes
                 if (value == null)
                     return;
 
-                Iterator<ByteBuffer> fieldNameIter = 
userTypeValue.type.fieldNames().iterator();
+                Iterator<FieldIdentifier> fieldNameIter = 
userTypeValue.type.fieldNames().iterator();
                 for (ByteBuffer buffer : userTypeValue.elements)
                 {
-                    ByteBuffer fieldName = fieldNameIter.next();
+                    assert fieldNameIter.hasNext();
+                    FieldIdentifier fieldName = fieldNameIter.next();
                     if (buffer == null)
                         continue;
 
@@ -318,9 +322,9 @@ public abstract class UserTypes
 
     public static class SetterByField extends Operation
     {
-        private final ColumnIdentifier field;
+        private final FieldIdentifier field;
 
-        public SetterByField(ColumnDefinition column, ColumnIdentifier field, 
Term t)
+        public SetterByField(ColumnDefinition column, FieldIdentifier field, 
Term t)
         {
             super(column, t);
             this.field = field;
@@ -335,7 +339,7 @@ public abstract class UserTypes
             if (value == UNSET_VALUE)
                 return;
 
-            CellPath fieldPath = ((UserType) 
column.type).cellPathForField(field.bytes);
+            CellPath fieldPath = ((UserType) 
column.type).cellPathForField(field);
             if (value == null)
                 params.addTombstone(column, fieldPath);
             else
@@ -345,9 +349,9 @@ public abstract class UserTypes
 
     public static class DeleterByField extends Operation
     {
-        private final ColumnIdentifier field;
+        private final FieldIdentifier field;
 
-        public DeleterByField(ColumnDefinition column, ColumnIdentifier field)
+        public DeleterByField(ColumnDefinition column, FieldIdentifier field)
         {
             super(column, null);
             this.field = field;
@@ -358,7 +362,7 @@ public abstract class UserTypes
             // we should not get here for frozen UDTs
             assert column.type.isMultiCell() : "Attempted to delete a single 
field from a frozen UDT";
 
-            CellPath fieldPath = ((UserType) 
column.type).cellPathForField(field.bytes);
+            CellPath fieldPath = ((UserType) 
column.type).cellPathForField(field);
             params.addTombstone(column, fieldPath);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java 
b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 0fe86e4..3905c83 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -189,6 +189,16 @@ public class FunctionCall extends Term.NonTerminal
             }
         }
 
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            // We could implement this, but the method is only used in 
selection clause, where FunctionCall is not used 
+            // we use a Selectable.WithFunction instead). And if that method 
is later used in other places, better to
+            // let that future patch make sure this can be implemented 
properly (note in particular we don't have access
+            // to the receiver type, which FunctionResolver.get() takes) 
rather than provide an implementation that may
+            // not work in all cases.
+            throw new UnsupportedOperationException();
+        }
+
         public String getText()
         {
             return name + 
terms.stream().map(Term.Raw::getText).collect(Collectors.joining(", ", "(", 
")"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
index 868f752..498cf0f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
@@ -24,7 +24,9 @@ import java.util.List;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.statements.RequestValidations;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
@@ -36,7 +38,7 @@ abstract class AbstractFunctionSelector<T extends Function> 
extends Selector
      * The list used to pass the function arguments is recycled to avoid the 
cost of instantiating a new list
      * with each function call.
      */
-    protected final List<ByteBuffer> args;
+    private final List<ByteBuffer> args;
     protected final List<Selector> argSelectors;
 
     public static Factory newFactory(final Function fun, final 
SelectorFactories factories) throws InvalidRequestException
@@ -80,10 +82,10 @@ abstract class AbstractFunctionSelector<T extends Function> 
extends Selector
                 factories.addFunctionsTo(functions);
             }
 
-            public Selector newInstance() throws InvalidRequestException
+            public Selector newInstance(QueryOptions options) throws 
InvalidRequestException
             {
-                return fun.isAggregate() ? new AggregateFunctionSelector(fun, 
factories.newInstances())
-                                         : new ScalarFunctionSelector(fun, 
factories.newInstances());
+                return fun.isAggregate() ? new AggregateFunctionSelector(fun, 
factories.newInstances(options))
+                                         : new ScalarFunctionSelector(fun, 
factories.newInstances(options));
             }
 
             public boolean isWritetimeSelectorFactory()
@@ -110,6 +112,19 @@ abstract class AbstractFunctionSelector<T extends 
Function> extends Selector
         this.args = Arrays.asList(new ByteBuffer[argSelectors.size()]);
     }
 
+    // Sets a given arg value. We should use that instead of directly setting 
the args list for the
+    // sake of validation.
+    protected void setArg(int i, ByteBuffer value) throws 
InvalidRequestException
+    {
+        RequestValidations.checkBindValueSet(value, "Invalid unset value for 
argument in call to function %s", fun.name().name);
+        args.set(i, value);
+    }
+
+    protected List<ByteBuffer> args()
+    {
+        return args;
+    }
+
     public AbstractType<?> getType()
     {
         return fun.returnType();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
index 27a8294..d768665 100644
--- 
a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
+++ 
b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
@@ -41,10 +41,10 @@ final class AggregateFunctionSelector extends 
AbstractFunctionSelector<Aggregate
         {
             Selector s = argSelectors.get(i);
             s.addInput(protocolVersion, rs);
-            args.set(i, s.getOutput(protocolVersion));
+            setArg(i, s.getOutput(protocolVersion));
             s.reset();
         }
-        this.aggregate.addInput(protocolVersion, args);
+        this.aggregate.addInput(protocolVersion, args());
     }
 
     public ByteBuffer getOutput(int protocolVersion) throws 
InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
index 965a01a..55ff50f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.selection;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -38,9 +39,7 @@ final class FieldSelector extends Selector
         {
             protected String getColumnName()
             {
-                return String.format("%s.%s",
-                                     factory.getColumnName(),
-                                     
UTF8Type.instance.getString(type.fieldName(field)));
+                return String.format("%s.%s", factory.getColumnName(), 
type.fieldName(field));
             }
 
             protected AbstractType<?> getReturnType()
@@ -53,9 +52,9 @@ final class FieldSelector extends Selector
                 factory.addColumnMapping(mapping, resultsColumn);
             }
 
-            public Selector newInstance() throws InvalidRequestException
+            public Selector newInstance(QueryOptions options) throws 
InvalidRequestException
             {
-                return new FieldSelector(type, field, factory.newInstance());
+                return new FieldSelector(type, field, 
factory.newInstance(options));
             }
 
             public boolean isAggregateSelectorFactory()
@@ -92,7 +91,7 @@ final class FieldSelector extends Selector
     @Override
     public String toString()
     {
-        return String.format("%s.%s", selected, 
UTF8Type.instance.getString(type.fieldName(field)));
+        return String.format("%s.%s", selected, type.fieldName(field));
     }
 
     private FieldSelector(UserType type, int field, Selector selected)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
index bb56bb8..50175c1 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
@@ -54,14 +54,14 @@ final class ScalarFunctionSelector extends 
AbstractFunctionSelector<ScalarFuncti
         for (int i = 0, m = argSelectors.size(); i < m; i++)
         {
             Selector s = argSelectors.get(i);
-            args.set(i, s.getOutput(protocolVersion));
+            setArg(i, s.getOutput(protocolVersion));
             s.reset();
         }
-        return fun.execute(protocolVersion, args);
+        return fun.execute(protocolVersion, args());
     }
 
     ScalarFunctionSelector(Function fun, List<Selector> argSelectors)
     {
         super((ScalarFunction) fun, argSelectors);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/Selectable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selectable.java 
b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
index faf3f2d..1f1f07b 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selectable.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
@@ -21,22 +21,40 @@ package org.apache.cassandra.cql3.selection;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.CQL3Type;
-import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
-public abstract class Selectable
+public interface Selectable extends AssignmentTestable
 {
-    public abstract Selector.Factory newSelectorFactory(CFMetaData cfm, 
List<ColumnDefinition> defs);
+    public Selector.Factory newSelectorFactory(CFMetaData cfm, AbstractType<?> 
expectedType, List<ColumnDefinition> defs, VariableSpecifications boundNames);
+
+    /**
+     * The type of the {@code Selectable} if it can be infered.
+     *
+     * @param keyspace the keyspace on which the statement for which this is a
+     * {@code Selectable} is on.
+     * @return the type of this {@code Selectable} if inferrable, or {@code 
null}
+     * otherwise (for instance, the type isn't inferable for a bind marker. 
Even for
+     * literals, the exact type is not inferrable since they are valid for many
+     * different types and so this will return {@code null} too).
+     */
+    public AbstractType<?> getExactTypeIfKnown(String keyspace);
+
+    // Term.Raw overrides this since some literals can be WEAKLY_ASSIGNABLE
+    default public TestResult testAssignment(String keyspace, 
ColumnSpecification receiver)
+    {
+        AbstractType<?> type = getExactTypeIfKnown(keyspace);
+        return type == null ? TestResult.NOT_ASSIGNABLE : 
type.testAssignment(keyspace, receiver);
+    }
 
-    protected static int addAndGetIndex(ColumnDefinition def, 
List<ColumnDefinition> l)
+    default int addAndGetIndex(ColumnDefinition def, List<ColumnDefinition> l)
     {
         int idx = l.indexOf(def);
         if (idx < 0)
@@ -47,57 +65,160 @@ public abstract class Selectable
         return idx;
     }
 
-    public static interface Raw
+    public static abstract class Raw
     {
-        public Selectable prepare(CFMetaData cfm);
+        public abstract Selectable prepare(CFMetaData cfm);
 
         /**
          * Returns true if any processing is performed on the selected column.
          **/
-        public boolean processesSelection();
+        public boolean processesSelection()
+        {
+            // ColumnIdentifier is the only case that returns false and 
override this
+            return true;
+        }
     }
 
-    public static class WritetimeOrTTL extends Selectable
+    public static class WithTerm implements Selectable
     {
-        public final ColumnIdentifier id;
+        /**
+         * The names given to unamed bind markers found in selection. In 
selection clause, we often don't have a good
+         * name for bind markers, typically if you have:
+         *   SELECT (int)? FROM foo;
+         * there isn't a good name for that marker. So we give the same name 
to all the markers. Note that we could try
+         * to differenciate the names by using some increasing number in the 
name (so [selection_1], [selection_2], ...)
+         * but it's actually not trivial to do in the current code and it's 
not really more helpful since if users wants
+         * to bind by position (which they will have to in this case), they 
can do so at the driver level directly. And
+         * so we don't bother.
+         * Note that users should really be using named bind markers if they 
want to be able to bind by names.
+         */
+        private static final ColumnIdentifier bindMarkerNameInSelection = new 
ColumnIdentifier("[selection]", true);
+
+        private final Term.Raw rawTerm;
+
+        public WithTerm(Term.Raw rawTerm)
+        {
+            this.rawTerm = rawTerm;
+        }
+
+        @Override
+        public TestResult testAssignment(String keyspace, ColumnSpecification 
receiver)
+        {
+            return rawTerm.testAssignment(keyspace, receiver);
+        }
+
+        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames) throws InvalidRequestException
+        {
+            /*
+             * expectedType will be null if we have no constraint on what the 
type should be. For instance, if this term is a bind marker:
+             *   - it will be null if we do "SELECT ? FROM foo"
+             *   - it won't be null (and be LongType) if we do "SELECT 
bigintAsBlob(?) FROM foo" because the function constrain it.
+             *
+             * In the first case, we have to error out: we need to infer the 
type of the metadata of a SELECT at preparation time, which we can't
+             * here (users will have to do "SELECT (varint)? FROM foo" for 
instance).
+             * But in the 2nd case, we're fine and can use the expectedType to 
"prepare" the bind marker/collect the bound type.
+             *
+             * Further, the term might not be a bind marker, in which case we 
sometimes can default to some most-general type. For instance, in
+             *   SELECT 3 FROM foo
+             * we'll just default the type to 'varint' as that's the most 
generic type for the literal '3' (this is mostly for convenience, the query
+             * is not terribly useful in practice and use can force the type 
as for the bind marker case through "SELECT (int)3 FROM foo").
+             * But note that not all literals can have such default type. For 
instance, there is no way to infer the type of a UDT literal in a vacuum,
+             * and so we simply error out if we have something like:
+             *   SELECT { foo: 'bar' } FROM foo
+             *
+             * Lastly, note that if the term is a terminal literal, we don't 
have to check it's compatibility with 'expectedType' as any incompatibility
+             * would have been found at preparation time.
+             */
+            AbstractType<?> type = getExactTypeIfKnown(cfm.ksName);
+            if (type == null)
+            {
+                type = expectedType;
+                if (type == null)
+                    throw new InvalidRequestException("Cannot infer type for 
term " + this + " in selection clause (try using a cast to force a type)");
+            }
+
+            // The fact we default the name to "[selection]" inconditionally 
means that any bind marker in a
+            // selection will have this name. Which isn't terribly helpful, 
but it's unclear how to provide
+            // something a lot more helpful and in practice user can bind 
those markers by position or, even better,
+            // use bind markers.
+            Term term = rawTerm.prepare(cfm.ksName, new 
ColumnSpecification(cfm.ksName, cfm.cfName, bindMarkerNameInSelection, type));
+            term.collectMarkerSpecification(boundNames);
+            return TermSelector.newFactory(rawTerm.getText(), term, type);
+        }
+
+        @Override
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            return rawTerm.getExactTypeIfKnown(keyspace);
+        }
+ 
+        @Override
+        public String toString()
+        {
+            return rawTerm.toString();
+        }
+
+        public static class Raw extends Selectable.Raw
+        {
+            private final Term.Raw term;
+
+            public Raw(Term.Raw term)
+            {
+                this.term = term;
+            }
+
+            public Selectable prepare(CFMetaData cfm)
+            {
+                return new WithTerm(term);
+            }
+        }
+    }
+
+    public static class WritetimeOrTTL implements Selectable
+    {
+        public final ColumnDefinition column;
         public final boolean isWritetime;
 
-        public WritetimeOrTTL(ColumnIdentifier id, boolean isWritetime)
+        public WritetimeOrTTL(ColumnDefinition column, boolean isWritetime)
         {
-            this.id = id;
+            this.column = column;
             this.isWritetime = isWritetime;
         }
 
         @Override
         public String toString()
         {
-            return (isWritetime ? "writetime" : "ttl") + "(" + id + ")";
+            return (isWritetime ? "writetime" : "ttl") + "(" + column.name + 
")";
         }
 
         public Selector.Factory newSelectorFactory(CFMetaData cfm,
-                                                   List<ColumnDefinition> defs)
+                                                   AbstractType<?> 
expectedType,
+                                                   List<ColumnDefinition> defs,
+                                                   VariableSpecifications 
boundNames)
         {
-            ColumnDefinition def = cfm.getColumnDefinition(id);
-            if (def == null)
-                throw new InvalidRequestException(String.format("Undefined 
name %s in selection clause", id));
-            if (def.isPrimaryKeyColumn())
+            if (column.isPrimaryKeyColumn())
                 throw new InvalidRequestException(
                         String.format("Cannot use selection function %s on 
PRIMARY KEY part %s",
                                       isWritetime ? "writeTime" : "ttl",
-                                      def.name));
-            if (def.type.isCollection())
+                                      column.name));
+            if (column.type.isCollection())
                 throw new InvalidRequestException(String.format("Cannot use 
selection function %s on collections",
                                                                 isWritetime ? 
"writeTime" : "ttl"));
 
-            return WritetimeOrTTLSelector.newFactory(def, addAndGetIndex(def, 
defs), isWritetime);
+            return WritetimeOrTTLSelector.newFactory(column, 
addAndGetIndex(column, defs), isWritetime);
+        }
+
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            return isWritetime ? LongType.instance : Int32Type.instance;
         }
 
-        public static class Raw implements Selectable.Raw
+        public static class Raw extends Selectable.Raw
         {
-            private final ColumnIdentifier.Raw id;
+            private final ColumnDefinition.Raw id;
             private final boolean isWritetime;
 
-            public Raw(ColumnIdentifier.Raw id, boolean isWritetime)
+            public Raw(ColumnDefinition.Raw id, boolean isWritetime)
             {
                 this.id = id;
                 this.isWritetime = isWritetime;
@@ -107,59 +228,42 @@ public abstract class Selectable
             {
                 return new WritetimeOrTTL(id.prepare(cfm), isWritetime);
             }
-
-            public boolean processesSelection()
-            {
-                return true;
-            }
         }
     }
 
-    public static class WithFunction extends Selectable
+    public static class WithFunction implements Selectable
     {
-        public final FunctionName functionName;
+        public final Function function;
         public final List<Selectable> args;
 
-        public WithFunction(FunctionName functionName, List<Selectable> args)
+        public WithFunction(Function function, List<Selectable> args)
         {
-            this.functionName = functionName;
+            this.function = function;
             this.args = args;
         }
 
         @Override
         public String toString()
         {
-            return new StrBuilder().append(functionName)
+            return new StrBuilder().append(function.name())
                                    .append("(")
                                    .appendWithSeparators(args, ", ")
                                    .append(")")
                                    .toString();
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
List<ColumnDefinition> defs)
+        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames)
         {
-            SelectorFactories factories  =
-                    
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, cfm, defs);
-
-            // We need to circumvent the normal function lookup process for 
toJson() because instances of the function
-            // are not pre-declared (because it can accept any type of 
argument).
-            Function fun;
-            if (functionName.equalsNativeFunction(ToJsonFct.NAME))
-                fun = ToJsonFct.getInstance(factories.getReturnTypes());
-            else
-                fun = FunctionResolver.get(cfm.ksName, functionName, 
factories.newInstances(), cfm.ksName, cfm.cfName, null);
-
-            if (fun == null)
-                throw new InvalidRequestException(String.format("Unknown 
function '%s'", functionName));
-
-            if (fun.returnType() == null)
-                throw new InvalidRequestException(String.format("Unknown 
function %s called in selection clause",
-                                                                functionName));
+            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, 
function.argTypes(), cfm, defs, boundNames);
+            return AbstractFunctionSelector.newFactory(function, factories);
+        }
 
-            return AbstractFunctionSelector.newFactory(fun, factories);
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            return function.returnType();
         }
 
-        public static class Raw implements Selectable.Raw
+        public static class Raw extends Selectable.Raw
         {
             private final FunctionName functionName;
             private final List<Selectable.Raw> args;
@@ -176,22 +280,79 @@ public abstract class Selectable
                                Collections.emptyList());
             }
 
-            public WithFunction prepare(CFMetaData cfm)
+            public Selectable prepare(CFMetaData cfm)
             {
                 List<Selectable> preparedArgs = new ArrayList<>(args.size());
                 for (Selectable.Raw arg : args)
                     preparedArgs.add(arg.prepare(cfm));
-                return new WithFunction(functionName, preparedArgs);
-            }
 
-            public boolean processesSelection()
-            {
-                return true;
+                FunctionName name = functionName;
+                // We need to circumvent the normal function lookup process 
for toJson() because instances of the function
+                // are not pre-declared (because it can accept any type of 
argument). We also have to wait until we have the
+                // selector factories of the argument so we can access their 
final type.
+                if (functionName.equalsNativeFunction(ToJsonFct.NAME))
+                {
+                    return new WithToJSonFunction(preparedArgs);
+                }
+                // Also, COUNT(x) is equivalent to COUNT(*) for any non-null 
term x (since count(x) don't care about it's argument outside of check for 
nullness) and
+                // for backward compatibilty we want to support COUNT(1), but 
we actually have COUNT(x) method for every existing (simple) input types so 
currently COUNT(1)
+                // will throw as ambiguous (since 1 works for any type). So we 
have have to special case COUNT.
+                else if 
(functionName.equalsNativeFunction(FunctionName.nativeFunction("count"))
+                        && preparedArgs.size() == 1
+                        && (preparedArgs.get(0) instanceof WithTerm)
+                        && (((WithTerm)preparedArgs.get(0)).rawTerm instanceof 
Constants.Literal))
+                {
+                    // Note that 'null' isn't a Constants.Literal
+                    name = AggregateFcts.countRowsFunction.name();
+                    preparedArgs = Collections.emptyList();
+                }
+
+                Function fun = FunctionResolver.get(cfm.ksName, name, 
preparedArgs, cfm.ksName, cfm.cfName, null);
+
+                if (fun == null)
+                    throw new InvalidRequestException(String.format("Unknown 
function '%s'", functionName));
+
+                if (fun.returnType() == null)
+                    throw new InvalidRequestException(String.format("Unknown 
function %s called in selection clause", functionName));
+
+                return new WithFunction(fun, preparedArgs);
             }
         }
     }
 
-    public static class WithCast extends Selectable
+    public static class WithToJSonFunction implements Selectable
+    {
+        public final List<Selectable> args;
+
+        private WithToJSonFunction(List<Selectable> args)
+        {
+            this.args = args;
+        }
+
+        @Override
+        public String toString()
+        {
+            return new StrBuilder().append(ToJsonFct.NAME)
+                                   .append("(")
+                                   .appendWithSeparators(args, ", ")
+                                   .append(")")
+                                   .toString();
+        }
+
+        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames)
+        {
+            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, null, cfm, 
defs, boundNames);
+            Function fun = ToJsonFct.getInstance(factories.getReturnTypes());
+            return AbstractFunctionSelector.newFactory(fun, factories);
+        }
+
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            return UTF8Type.instance;
+        }
+    }
+
+    public static class WithCast implements Selectable
     {
         private final CQL3Type type;
         private final Selectable arg;
@@ -208,21 +369,19 @@ public abstract class Selectable
             return String.format("cast(%s as %s)", arg, 
type.toString().toLowerCase());
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
List<ColumnDefinition> defs)
+        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames)
         {
-            SelectorFactories factories  =
-                    
SelectorFactories.createFactoriesAndCollectColumnDefinitions(Collections.singletonList(arg),
 cfm, defs);
+            List<Selectable> args = Collections.singletonList(arg);
+            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, null, cfm, 
defs, boundNames);
 
             Selector.Factory factory = factories.get(0);
 
             // If the user is trying to cast a type on its own type we simply 
ignore it.
             if (type.getType().equals(factory.getReturnType()))
-            {
                 return factory;
-            }
 
             FunctionName name = 
FunctionName.nativeFunction(CastFcts.getFunctionName(type));
-            Function fun = FunctionResolver.get(cfm.ksName, name, 
factories.newInstances(), cfm.ksName, cfm.cfName, null);
+            Function fun = FunctionResolver.get(cfm.ksName, name, args, 
cfm.ksName, cfm.cfName, null);
 
             if (fun == null)
             {
@@ -233,7 +392,12 @@ public abstract class Selectable
             return AbstractFunctionSelector.newFactory(fun, factories);
         }
 
-        public static class Raw implements Selectable.Raw
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            return type.getType();
+        }
+
+        public static class Raw extends Selectable.Raw
         {
             private final CQL3Type type;
             private final Selectable.Raw arg;
@@ -248,20 +412,15 @@ public abstract class Selectable
             {
                 return new WithCast(arg.prepare(cfm), type);
             }
-
-            public boolean processesSelection()
-            {
-                return true;
-            }
         }
     }
 
-    public static class WithFieldSelection extends Selectable
+    public static class WithFieldSelection implements Selectable
     {
         public final Selectable selected;
-        public final ColumnIdentifier field;
+        public final FieldIdentifier field;
 
-        public WithFieldSelection(Selectable selected, ColumnIdentifier field)
+        public WithFieldSelection(Selectable selected, FieldIdentifier field)
         {
             this.selected = selected;
             this.field = field;
@@ -273,10 +432,10 @@ public abstract class Selectable
             return String.format("%s.%s", selected, field);
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
List<ColumnDefinition> defs)
+        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames)
         {
-            Selector.Factory factory = selected.newSelectorFactory(cfm, defs);
-            AbstractType<?> type = factory.newInstance().getType();
+            Selector.Factory factory = selected.newSelectorFactory(cfm, null, 
defs, boundNames);
+            AbstractType<?> type = factory.getColumnSpecification(cfm).type;
             if (!type.isUDT())
             {
                 throw new InvalidRequestException(
@@ -286,7 +445,7 @@ public abstract class Selectable
             }
 
             UserType ut = (UserType) type;
-            int fieldIndex = ((UserType) type).fieldPosition(field);
+            int fieldIndex = ut.fieldPosition(field);
             if (fieldIndex == -1)
             {
                 throw new InvalidRequestException(String.format("%s of type %s 
has no field %s",
@@ -296,12 +455,26 @@ public abstract class Selectable
             return FieldSelector.newFactory(ut, fieldIndex, factory);
         }
 
-        public static class Raw implements Selectable.Raw
+        public AbstractType<?> getExactTypeIfKnown(String keyspace)
+        {
+            AbstractType<?> selectedType = 
selected.getExactTypeIfKnown(keyspace);
+            if (selectedType == null || !(selectedType instanceof UserType))
+                return null;
+
+            UserType ut = (UserType) selectedType;
+            int fieldIndex = ut.fieldPosition(field);
+            if (fieldIndex == -1)
+                return null;
+
+            return ut.fieldType(fieldIndex);
+        }
+
+        public static class Raw extends Selectable.Raw
         {
             private final Selectable.Raw selected;
-            private final ColumnIdentifier.Raw field;
+            private final FieldIdentifier field;
 
-            public Raw(Selectable.Raw selected, ColumnIdentifier.Raw field)
+            public Raw(Selectable.Raw selected, FieldIdentifier field)
             {
                 this.selected = selected;
                 this.field = field;
@@ -309,12 +482,7 @@ public abstract class Selectable
 
             public WithFieldSelection prepare(CFMetaData cfm)
             {
-                return new WithFieldSelection(selected.prepare(cfm), 
field.prepare(cfm));
-            }
-
-            public boolean processesSelection()
-            {
-                return true;
+                return new WithFieldSelection(selected.prepare(cfm), field);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java 
b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index 664fd4f..2a11d27 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -169,12 +169,12 @@ public abstract class Selection
         return false;
     }
 
-    public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> 
rawSelectors) throws InvalidRequestException
+    public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> 
rawSelectors, VariableSpecifications boundNames) throws InvalidRequestException
     {
         List<ColumnDefinition> defs = new ArrayList<>();
 
         SelectorFactories factories =
-                
SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors,
 cfm), cfm, defs);
+                
SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors,
 cfm), null, cfm, defs, boundNames);
         SelectionColumnMapping mapping = collectColumnMappings(cfm, 
rawSelectors, factories);
 
         return (processesSelection(rawSelectors) || rawSelectors.size() != 
defs.size())
@@ -221,7 +221,7 @@ public abstract class Selection
         return selectionColumns;
     }
 
-    protected abstract Selectors newSelectors() throws InvalidRequestException;
+    protected abstract Selectors newSelectors(QueryOptions options) throws 
InvalidRequestException;
 
     /**
      * @return the list of CQL3 columns value this SelectionClause needs.
@@ -239,9 +239,9 @@ public abstract class Selection
         return columnMapping;
     }
 
-    public ResultSetBuilder resultSetBuilder(boolean isJons) throws 
InvalidRequestException
+    public ResultSetBuilder resultSetBuilder(QueryOptions options, boolean 
isJons) throws InvalidRequestException
     {
-        return new ResultSetBuilder(isJons);
+        return new ResultSetBuilder(options, isJons);
     }
 
     public abstract boolean isAggregate();
@@ -287,6 +287,7 @@ public abstract class Selection
     public class ResultSetBuilder
     {
         private final ResultSet resultSet;
+        private final int protocolVersion;
 
         /**
          * As multiple thread can access a <code>Selection</code> instance 
each <code>ResultSetBuilder</code> will use
@@ -308,10 +309,11 @@ public abstract class Selection
 
         private final boolean isJson;
 
-        private ResultSetBuilder(boolean isJson) throws InvalidRequestException
+        private ResultSetBuilder(QueryOptions options, boolean isJson) throws 
InvalidRequestException
         {
             this.resultSet = new ResultSet(getResultMetadata(isJson).copy(), 
new ArrayList<List<ByteBuffer>>());
-            this.selectors = newSelectors();
+            this.protocolVersion = options.getProtocolVersion();
+            this.selectors = newSelectors(options);
             this.timestamps = collectTimestamps ? new long[columns.size()] : 
null;
             this.ttls = collectTTLs ? new int[columns.size()] : null;
             this.isJson = isJson;
@@ -361,36 +363,36 @@ public abstract class Selection
                  : c.value();
         }
 
-        public void newRow(int protocolVersion) throws InvalidRequestException
+        public void newRow() throws InvalidRequestException
         {
             if (current != null)
             {
                 selectors.addInputRow(protocolVersion, this);
                 if (!selectors.isAggregate())
                 {
-                    resultSet.addRow(getOutputRow(protocolVersion));
+                    resultSet.addRow(getOutputRow());
                     selectors.reset();
                 }
             }
             current = new ArrayList<>(columns.size());
         }
 
-        public ResultSet build(int protocolVersion) throws 
InvalidRequestException
+        public ResultSet build() throws InvalidRequestException
         {
             if (current != null)
             {
                 selectors.addInputRow(protocolVersion, this);
-                resultSet.addRow(getOutputRow(protocolVersion));
+                resultSet.addRow(getOutputRow());
                 selectors.reset();
                 current = null;
             }
 
             if (resultSet.isEmpty() && selectors.isAggregate())
-                resultSet.addRow(getOutputRow(protocolVersion));
+                resultSet.addRow(getOutputRow());
             return resultSet;
         }
 
-        private List<ByteBuffer> getOutputRow(int protocolVersion)
+        private List<ByteBuffer> getOutputRow()
         {
             List<ByteBuffer> outputRow = 
selectors.getOutputRow(protocolVersion);
             return isJson ? rowToJson(outputRow, protocolVersion, metadata)
@@ -415,7 +417,7 @@ public abstract class Selection
         public void reset();
     }
 
-    // Special cased selection for when no function is used (this save some 
allocations).
+    // Special cased selection for when only columns are selected.
     private static class SimpleSelection extends Selection
     {
         private final boolean isWildcard;
@@ -450,7 +452,7 @@ public abstract class Selection
             return false;
         }
 
-        protected Selectors newSelectors()
+        protected Selectors newSelectors(QueryOptions options)
         {
             return new Selectors()
             {
@@ -531,11 +533,11 @@ public abstract class Selection
             return factories.doesAggregation();
         }
 
-        protected Selectors newSelectors() throws InvalidRequestException
+        protected Selectors newSelectors(final QueryOptions options) throws 
InvalidRequestException
         {
             return new Selectors()
             {
-                private final List<Selector> selectors = 
factories.newInstances();
+                private final List<Selector> selectors = 
factories.newInstances(options);
 
                 public void reset()
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java 
b/src/java/org/apache/cassandra/cql3/selection/Selector.java
index 87ab14c..c85dcd1 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@ -21,13 +21,12 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.cql3.AssignmentTestable;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.ReversedType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 /**
@@ -36,7 +35,7 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
  * <p>Since the introduction of aggregation, <code>Selector</code>s cannot be 
called anymore by multiple threads 
  * as they have an internal state.</p>
  */
-public abstract class Selector implements AssignmentTestable
+public abstract class Selector
 {
     /**
      * A factory for <code>Selector</code> instances.
@@ -58,16 +57,19 @@ public abstract class Selector implements AssignmentTestable
         {
             return new ColumnSpecification(cfm.ksName,
                                            cfm.cfName,
-                                           
ColumnIdentifier.getInterned(getColumnName(), true),
+                                           new 
ColumnIdentifier(getColumnName(), true), // note that the name is not 
necessarily
+                                                                               
         // a true column name so we shouldn't intern it
                                            getReturnType());
         }
 
         /**
          * Creates a new <code>Selector</code> instance.
          *
+         * @param options the options of the query for which the instance is 
created (some selector
+         * depends on the bound values in particular).
          * @return a new <code>Selector</code> instance
          */
-        public abstract Selector newInstance() throws InvalidRequestException;
+        public abstract Selector newInstance(QueryOptions options) throws 
InvalidRequestException;
 
         /**
          * Checks if this factory creates selectors instances that creates 
aggregates.
@@ -183,24 +185,4 @@ public abstract class Selector implements 
AssignmentTestable
      * Reset the internal state of this <code>Selector</code>.
      */
     public abstract void reset();
-
-    public final AssignmentTestable.TestResult testAssignment(String keyspace, 
ColumnSpecification receiver)
-    {
-        // We should ignore the fact that the output type is frozen in our 
comparison as functions do not support
-        // frozen types for arguments
-        AbstractType<?> receiverType = receiver.type;
-        if (getType().isFreezable() && !getType().isMultiCell())
-            receiverType = receiverType.freeze();
-
-        if (getType().isReversed())
-            receiverType = ReversedType.getInstance(receiverType);
-
-        if (receiverType.equals(getType()))
-            return AssignmentTestable.TestResult.EXACT_MATCH;
-
-        if (receiverType.isValueCompatibleWith(getType()))
-            return AssignmentTestable.TestResult.WEAKLY_ASSIGNABLE;
-
-        return AssignmentTestable.TestResult.NOT_ASSIGNABLE;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java 
b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
index 97a1198..41bf193 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
@@ -23,6 +23,8 @@ import com.google.common.collect.Lists;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.VariableSpecifications;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.selection.Selector.Factory;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -57,29 +59,40 @@ final class SelectorFactories implements 
Iterable<Selector.Factory>
      * Creates a new <code>SelectorFactories</code> instance and collect the 
column definitions.
      *
      * @param selectables the <code>Selectable</code>s for which the factories 
must be created
+     * @param expectedTypes the returned types expected for each of the {@code 
selectables}, if there
+     * is any such expectations, or {@code null} otherwise. This will be 
{@code null} when called on
+     * the top-level selectables, but may not be for selectable nested within 
a function for instance
+     * (as the argument selectable will be expected to be of the type expected 
by the function).
      * @param cfm the Column Family Definition
      * @param defs the collector parameter for the column definitions
+     * @param boundNames the collector for the specification of bound markers 
in the selection
      * @return a new <code>SelectorFactories</code> instance
      * @throws InvalidRequestException if a problem occurs while creating the 
factories
      */
     public static SelectorFactories 
createFactoriesAndCollectColumnDefinitions(List<Selectable> selectables,
+                                                                               
List<AbstractType<?>> expectedTypes,
                                                                                
CFMetaData cfm,
-                                                                               
List<ColumnDefinition> defs)
+                                                                               
List<ColumnDefinition> defs,
+                                                                               
VariableSpecifications boundNames)
                                                                                
throws InvalidRequestException
     {
-        return new SelectorFactories(selectables, cfm, defs);
+        return new SelectorFactories(selectables, expectedTypes, cfm, defs, 
boundNames);
     }
 
     private SelectorFactories(List<Selectable> selectables,
+                              List<AbstractType<?>> expectedTypes,
                               CFMetaData cfm,
-                              List<ColumnDefinition> defs)
+                              List<ColumnDefinition> defs,
+                              VariableSpecifications boundNames)
                               throws InvalidRequestException
     {
         factories = new ArrayList<>(selectables.size());
 
-        for (Selectable selectable : selectables)
+        for (int i = 0; i < selectables.size(); i++)
         {
-            Factory factory = selectable.newSelectorFactory(cfm, defs);
+            Selectable selectable = selectables.get(i);
+            AbstractType<?> expectedType = expectedTypes == null ? null : 
expectedTypes.get(i);
+            Factory factory = selectable.newSelectorFactory(cfm, expectedType, 
defs, boundNames);
             containsWritetimeFactory |= factory.isWritetimeSelectorFactory();
             containsTTLFactory |= factory.isTTLSelectorFactory();
             if (factory.isAggregateSelectorFactory())
@@ -148,15 +161,15 @@ final class SelectorFactories implements 
Iterable<Selector.Factory>
 
     /**
      * Creates a list of new <code>Selector</code> instances.
+     *
+     * @param options the query options for the query being executed.
      * @return a list of new <code>Selector</code> instances.
      */
-    public List<Selector> newInstances() throws InvalidRequestException
+    public List<Selector> newInstances(QueryOptions options) throws 
InvalidRequestException
     {
         List<Selector> selectors = new ArrayList<>(factories.size());
         for (Selector.Factory factory : factories)
-        {
-            selectors.add(factory.newInstance());
-        }
+            selectors.add(factory.newInstance(options));
         return selectors;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
index e4040fa..e14cd5c 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -55,7 +56,7 @@ public final class SimpleSelector extends Selector
             }
 
             @Override
-            public Selector newInstance()
+            public Selector newInstance(QueryOptions options)
             {
                 return new SimpleSelector(def.name.toString(), idx, def.type);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/TermSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/TermSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/TermSelector.java
new file mode 100644
index 0000000..5aa4522
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/selection/TermSelector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.selection;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.AssignmentTestable;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * Selector representing a simple term (literals or bound variables).
+ * <p>
+ * Note that we know the term does not include function calls for instance 
(this is actually enforced by the parser), those
+ * being dealt with by their own Selector.
+ */
+public class TermSelector extends Selector
+{
+    private final ByteBuffer value;
+    private final AbstractType<?> type;
+
+    public static Factory newFactory(final String name, final Term term, final 
AbstractType<?> type)
+    {
+        return new Factory()
+        {
+            protected String getColumnName()
+            {
+                return name;
+            }
+
+            protected AbstractType<?> getReturnType()
+            {
+                return type;
+            }
+
+            protected void addColumnMapping(SelectionColumnMapping mapping, 
ColumnSpecification resultColumn)
+            {
+               mapping.addMapping(resultColumn, (ColumnDefinition)null);
+            }
+
+            public Selector newInstance(QueryOptions options)
+            {
+                return new TermSelector(term.bindAndGet(options), type);
+            }
+        };
+    }
+
+    private TermSelector(ByteBuffer value, AbstractType<?> type)
+    {
+        this.value = value;
+        this.type = type;
+    }
+
+    public void addInput(int protocolVersion, Selection.ResultSetBuilder rs) 
throws InvalidRequestException
+    {
+    }
+
+    public ByteBuffer getOutput(int protocolVersion) throws 
InvalidRequestException
+    {
+        return value;
+    }
+
+    public AbstractType<?> getType()
+    {
+        return type;
+    }
+
+    public void reset()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
index 131827f..78380d7 100644
--- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.selection;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -54,7 +55,7 @@ final class WritetimeOrTTLSelector extends Selector
                mapping.addMapping(resultsColumn, def);
             }
 
-            public Selector newInstance()
+            public Selector newInstance(QueryOptions options)
             {
                 return new WritetimeOrTTLSelector(def.name.toString(), idx, 
isWritetime);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index ee2b623..afe2776 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -53,14 +53,14 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
 
     public final Type oType;
     private final TableAttributes attrs;
-    private final Map<ColumnIdentifier.Raw, ColumnIdentifier.Raw> renames;
+    private final Map<ColumnDefinition.Raw, ColumnDefinition.Raw> renames;
     private final List<AlterTableStatementColumn> colNameList;
 
     public AlterTableStatement(CFName name,
                                Type type,
                                List<AlterTableStatementColumn> colDataList,
                                TableAttributes attrs,
-                               Map<ColumnIdentifier.Raw, ColumnIdentifier.Raw> 
renames)
+                               Map<ColumnDefinition.Raw, ColumnDefinition.Raw> 
renames)
     {
         super(name);
         this.oType = type;
@@ -91,7 +91,7 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
         CQL3Type.Raw dataType = null;
         boolean isStatic = false;
         CQL3Type validator = null;
-        ColumnIdentifier.Raw rawColumnName = null;
+        ColumnDefinition.Raw rawColumnName = null;
 
         List<ViewDefinition> viewUpdates = null;
         Iterable<ViewDefinition> views = View.findAll(keyspace(), 
columnFamily());
@@ -101,11 +101,10 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
             case ADD:
                 for (AlterTableStatementColumn colData : colNameList)
                 {
-                    columnName = null;
                     rawColumnName = colData.getColumnName();
                     if (rawColumnName != null)
                     {
-                        columnName = rawColumnName.prepare(cfm);
+                        columnName = rawColumnName.getIdentifier(cfm);
                         def =  cfm.getColumnDefinition(columnName);
                         dataType = colData.getColumnType();
                         isStatic = colData.getStaticType();
@@ -191,7 +190,7 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
                 rawColumnName = colNameList.get(0).getColumnName();
                 if (rawColumnName != null)
                 {
-                    columnName = rawColumnName.prepare(cfm);
+                    columnName = rawColumnName.getIdentifier(cfm);
                     def = cfm.getColumnDefinition(columnName);
                     dataType = colNameList.get(0).getColumnType();
                     validator = dataType == null ? null : 
dataType.prepare(keyspace());
@@ -234,7 +233,7 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
                     rawColumnName = colData.getColumnName();
                     if (rawColumnName != null)
                     {
-                        columnName = rawColumnName.prepare(cfm);
+                        columnName = rawColumnName.getIdentifier(cfm);
                         def = cfm.getColumnDefinition(columnName);
                     }
                     assert columnName != null;
@@ -322,10 +321,10 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
 
                 break;
             case RENAME:
-                for (Map.Entry<ColumnIdentifier.Raw, ColumnIdentifier.Raw> 
entry : renames.entrySet())
+                for (Map.Entry<ColumnDefinition.Raw, ColumnDefinition.Raw> 
entry : renames.entrySet())
                 {
-                    ColumnIdentifier from = entry.getKey().prepare(cfm);
-                    ColumnIdentifier to = entry.getValue().prepare(cfm);
+                    ColumnIdentifier from = entry.getKey().getIdentifier(cfm);
+                    ColumnIdentifier to = entry.getValue().getIdentifier(cfm);
                     cfm.renameColumn(from, to);
 
                     // If the view includes a renamed column, it must be 
renamed in the view table and the definition.
@@ -334,8 +333,8 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
                         if (!view.includes(from)) continue;
 
                         ViewDefinition viewCopy = view.copy();
-                        ColumnIdentifier viewFrom = 
entry.getKey().prepare(viewCopy.metadata);
-                        ColumnIdentifier viewTo = 
entry.getValue().prepare(viewCopy.metadata);
+                        ColumnIdentifier viewFrom = 
entry.getKey().getIdentifier(viewCopy.metadata);
+                        ColumnIdentifier viewTo = 
entry.getValue().getIdentifier(viewCopy.metadata);
                         viewCopy.renameColumn(viewFrom, viewTo);
 
                         if (viewUpdates == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java
index a5d7de7..813effe 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java
@@ -17,37 +17,45 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 
-public class AlterTableStatementColumn {
+public class AlterTableStatementColumn
+{
     private final CQL3Type.Raw dataType;
-    private final ColumnIdentifier.Raw colName;
+    private final ColumnDefinition.Raw colName;
     private final Boolean isStatic;
 
-    public AlterTableStatementColumn(ColumnIdentifier.Raw colName, 
CQL3Type.Raw dataType, boolean isStatic) {
+    public AlterTableStatementColumn(ColumnDefinition.Raw colName, 
CQL3Type.Raw dataType, boolean isStatic)
+    {
         this.dataType = dataType;
         this.colName = colName;
         this.isStatic = isStatic;
     }
 
-    public AlterTableStatementColumn(ColumnIdentifier.Raw colName, 
CQL3Type.Raw dataType) {
+    public AlterTableStatementColumn(ColumnDefinition.Raw colName, 
CQL3Type.Raw dataType)
+    {
         this(colName, dataType,false );
     }
 
-    public AlterTableStatementColumn(ColumnIdentifier.Raw colName) {
+    public AlterTableStatementColumn(ColumnDefinition.Raw colName)
+    {
         this(colName, null, false);
     }
 
-    public CQL3Type.Raw getColumnType() {
+    public CQL3Type.Raw getColumnType()
+    {
         return dataType;
     }
 
-    public ColumnIdentifier.Raw getColumnName() {
+    public ColumnDefinition.Raw getColumnName()
+    {
         return colName;
     }
 
-    public Boolean getStaticType() {
+    public Boolean getStaticType()
+    {
         return isStatic;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
index 4ed726a..64bccf5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
@@ -51,17 +51,17 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
 
     protected abstract UserType makeUpdatedType(UserType toUpdate, 
KeyspaceMetadata ksm) throws InvalidRequestException;
 
-    public static AlterTypeStatement addition(UTName name, ColumnIdentifier 
fieldName, CQL3Type.Raw type)
+    public static AlterTypeStatement addition(UTName name, FieldIdentifier 
fieldName, CQL3Type.Raw type)
     {
         return new AddOrAlter(name, true, fieldName, type);
     }
 
-    public static AlterTypeStatement alter(UTName name, ColumnIdentifier 
fieldName, CQL3Type.Raw type)
+    public static AlterTypeStatement alter(UTName name, FieldIdentifier 
fieldName, CQL3Type.Raw type)
     {
         return new AddOrAlter(name, false, fieldName, type);
     }
 
-    public static AlterTypeStatement renames(UTName name, 
Map<ColumnIdentifier, ColumnIdentifier> renames)
+    public static AlterTypeStatement renames(UTName name, Map<FieldIdentifier, 
FieldIdentifier> renames)
     {
         return new Renames(name, renames);
     }
@@ -137,14 +137,6 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
         return new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, 
Event.SchemaChange.Target.TYPE, keyspace(), name.getStringTypeName());
     }
 
-    private static int getIdxOfField(UserType type, ColumnIdentifier field)
-    {
-        for (int i = 0; i < type.size(); i++)
-            if (field.bytes.equals(type.fieldName(i)))
-                return i;
-        return -1;
-    }
-
     private boolean updateDefinition(CFMetaData cfm, ColumnDefinition def, 
String keyspace, ByteBuffer toReplace, UserType updated)
     {
         AbstractType<?> t = updateWith(def.type, keyspace, toReplace, updated);
@@ -247,10 +239,10 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
     private static class AddOrAlter extends AlterTypeStatement
     {
         private final boolean isAdd;
-        private final ColumnIdentifier fieldName;
+        private final FieldIdentifier fieldName;
         private final CQL3Type.Raw type;
 
-        public AddOrAlter(UTName name, boolean isAdd, ColumnIdentifier 
fieldName, CQL3Type.Raw type)
+        public AddOrAlter(UTName name, boolean isAdd, FieldIdentifier 
fieldName, CQL3Type.Raw type)
         {
             super(name);
             this.isAdd = isAdd;
@@ -260,12 +252,12 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
 
         private UserType doAdd(UserType toUpdate) throws 
InvalidRequestException
         {
-            if (getIdxOfField(toUpdate, fieldName) >= 0)
+            if (toUpdate.fieldPosition(fieldName) >= 0)
                 throw new InvalidRequestException(String.format("Cannot add 
new field %s to type %s: a field of the same name already exists", fieldName, 
name));
 
-            List<ByteBuffer> newNames = new ArrayList<>(toUpdate.size() + 1);
+            List<FieldIdentifier> newNames = new ArrayList<>(toUpdate.size() + 
1);
             newNames.addAll(toUpdate.fieldNames());
-            newNames.add(fieldName.bytes);
+            newNames.add(fieldName);
 
             AbstractType<?> addType = type.prepare(keyspace()).getType();
             if (addType.referencesUserType(toUpdate.getNameAsString()))
@@ -282,7 +274,7 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
         {
             checkTypeNotUsedByAggregate(ksm);
 
-            int idx = getIdxOfField(toUpdate, fieldName);
+            int idx = toUpdate.fieldPosition(fieldName);
             if (idx < 0)
                 throw new InvalidRequestException(String.format("Unknown field 
%s in type %s", fieldName, name));
 
@@ -290,7 +282,7 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
             if (!type.prepare(keyspace()).getType().isCompatibleWith(previous))
                 throw new InvalidRequestException(String.format("Type %s is 
incompatible with previous type %s of field %s in user type %s", type, 
previous.asCQL3Type(), fieldName, name));
 
-            List<ByteBuffer> newNames = new ArrayList<>(toUpdate.fieldNames());
+            List<FieldIdentifier> newNames = new 
ArrayList<>(toUpdate.fieldNames());
             List<AbstractType<?>> newTypes = new 
ArrayList<>(toUpdate.fieldTypes());
             newTypes.set(idx, type.prepare(keyspace()).getType());
 
@@ -305,9 +297,9 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
 
     private static class Renames extends AlterTypeStatement
     {
-        private final Map<ColumnIdentifier, ColumnIdentifier> renames;
+        private final Map<FieldIdentifier, FieldIdentifier> renames;
 
-        public Renames(UTName name, Map<ColumnIdentifier, ColumnIdentifier> 
renames)
+        public Renames(UTName name, Map<FieldIdentifier, FieldIdentifier> 
renames)
         {
             super(name);
             this.renames = renames;
@@ -317,17 +309,17 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
         {
             checkTypeNotUsedByAggregate(ksm);
 
-            List<ByteBuffer> newNames = new ArrayList<>(toUpdate.fieldNames());
+            List<FieldIdentifier> newNames = new 
ArrayList<>(toUpdate.fieldNames());
             List<AbstractType<?>> newTypes = new 
ArrayList<>(toUpdate.fieldTypes());
 
-            for (Map.Entry<ColumnIdentifier, ColumnIdentifier> entry : 
renames.entrySet())
+            for (Map.Entry<FieldIdentifier, FieldIdentifier> entry : 
renames.entrySet())
             {
-                ColumnIdentifier from = entry.getKey();
-                ColumnIdentifier to = entry.getValue();
-                int idx = getIdxOfField(toUpdate, from);
+                FieldIdentifier from = entry.getKey();
+                FieldIdentifier to = entry.getValue();
+                int idx = toUpdate.fieldPosition(from);
                 if (idx < 0)
                     throw new InvalidRequestException(String.format("Unknown 
field %s in type %s", from, name));
-                newNames.set(idx, to.bytes);
+                newNames.set(idx, to);
             }
 
             UserType updated = new UserType(toUpdate.keyspace, toUpdate.name, 
newNames, newTypes, toUpdate.isMultiCell());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
index 3268296..6f4331b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.transport.Event;
 public class CreateTypeStatement extends SchemaAlteringStatement
 {
     private final UTName name;
-    private final List<ColumnIdentifier> columnNames = new ArrayList<>();
+    private final List<FieldIdentifier> columnNames = new ArrayList<>();
     private final List<CQL3Type.Raw> columnTypes = new ArrayList<>();
     private final boolean ifNotExists;
 
@@ -55,7 +55,7 @@ public class CreateTypeStatement extends 
SchemaAlteringStatement
             name.setKeyspace(state.getKeyspace());
     }
 
-    public void addDefinition(ColumnIdentifier name, CQL3Type.Raw type)
+    public void addDefinition(FieldIdentifier name, CQL3Type.Raw type)
     {
         columnNames.add(name);
         columnTypes.add(type);
@@ -88,13 +88,11 @@ public class CreateTypeStatement extends 
SchemaAlteringStatement
     {
         for (int i = 0; i < type.size() - 1; i++)
         {
-            ByteBuffer fieldName = type.fieldName(i);
+            FieldIdentifier fieldName = type.fieldName(i);
             for (int j = i+1; j < type.size(); j++)
             {
                 if (fieldName.equals(type.fieldName(j)))
-                    throw new InvalidRequestException(String.format("Duplicate 
field name %s in type %s",
-                                                                    
UTF8Type.instance.getString(fieldName),
-                                                                    
UTF8Type.instance.getString(type.name)));
+                    throw new InvalidRequestException(String.format("Duplicate 
field name %s in type %s", fieldName, type.name));
             }
         }
     }
@@ -102,7 +100,7 @@ public class CreateTypeStatement extends 
SchemaAlteringStatement
     public void addToRawBuilder(Types.RawBuilder builder) throws 
InvalidRequestException
     {
         builder.add(name.getStringTypeName(),
-                    
columnNames.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()),
+                    
columnNames.stream().map(FieldIdentifier::toString).collect(Collectors.toList()),
                     
columnTypes.stream().map(CQL3Type.Raw::toString).collect(Collectors.toList()));
     }
 
@@ -114,15 +112,11 @@ public class CreateTypeStatement extends 
SchemaAlteringStatement
 
     public UserType createType() throws InvalidRequestException
     {
-        List<ByteBuffer> names = new ArrayList<>(columnNames.size());
-        for (ColumnIdentifier name : columnNames)
-            names.add(name.bytes);
-
         List<AbstractType<?>> types = new ArrayList<>(columnTypes.size());
         for (CQL3Type.Raw type : columnTypes)
             types.add(type.prepare(keyspace()).getType());
 
-        return new UserType(name.getKeyspace(), name.getUserTypeName(), names, 
types, true);
+        return new UserType(name.getKeyspace(), name.getUserTypeName(), 
columnNames, types, true);
     }
 
     public Event.SchemaChange announceMigration(boolean isLocalOnly) throws 
InvalidRequestException, ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed00607/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index dbb2b9b..71e248a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -51,8 +51,8 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
     private final CFName baseName;
     private final List<RawSelector> selectClause;
     private final WhereClause whereClause;
-    private final List<ColumnIdentifier.Raw> partitionKeys;
-    private final List<ColumnIdentifier.Raw> clusteringKeys;
+    private final List<ColumnDefinition.Raw> partitionKeys;
+    private final List<ColumnDefinition.Raw> clusteringKeys;
     public final CFProperties properties = new CFProperties();
     private final boolean ifNotExists;
 
@@ -60,8 +60,8 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
                                CFName baseName,
                                List<RawSelector> selectClause,
                                WhereClause whereClause,
-                               List<ColumnIdentifier.Raw> partitionKeys,
-                               List<ColumnIdentifier.Raw> clusteringKeys,
+                               List<ColumnDefinition.Raw> partitionKeys,
+                               List<ColumnDefinition.Raw> clusteringKeys,
                                boolean ifNotExists)
     {
         super(viewName);
@@ -159,30 +159,26 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
                 throw new InvalidRequestException("Cannot use function when 
defining a materialized view");
             if (selectable instanceof Selectable.WritetimeOrTTL.Raw)
                 throw new InvalidRequestException("Cannot use function when 
defining a materialized view");
-            ColumnIdentifier identifier = (ColumnIdentifier) 
selectable.prepare(cfm);
             if (selector.alias != null)
-                throw new InvalidRequestException(String.format("Cannot alias 
column '%s' as '%s' when defining a materialized view", identifier.toString(), 
selector.alias.toString()));
+                throw new InvalidRequestException("Cannot use alias when 
defining a materialized view");
 
-            ColumnDefinition cdef = cfm.getColumnDefinition(identifier);
+            Selectable s = selectable.prepare(cfm);
+            if (s instanceof Term.Raw)
+                throw new InvalidRequestException("Cannot use terms in 
selection when defining a materialized view");
 
-            if (cdef == null)
-                throw new InvalidRequestException("Unknown column name 
detected in CREATE MATERIALIZED VIEW statement : "+identifier);
-
-            included.add(identifier);
+            ColumnDefinition cdef = (ColumnDefinition)s;
+            included.add(cdef.name);
         }
 
-        Set<ColumnIdentifier.Raw> targetPrimaryKeys = new HashSet<>();
-        for (ColumnIdentifier.Raw identifier : Iterables.concat(partitionKeys, 
clusteringKeys))
+        Set<ColumnDefinition.Raw> targetPrimaryKeys = new HashSet<>();
+        for (ColumnDefinition.Raw identifier : Iterables.concat(partitionKeys, 
clusteringKeys))
         {
             if (!targetPrimaryKeys.add(identifier))
                 throw new InvalidRequestException("Duplicate entry found in 
PRIMARY KEY: "+identifier);
 
-            ColumnDefinition cdef = 
cfm.getColumnDefinition(identifier.prepare(cfm));
-
-            if (cdef == null)
-                throw new InvalidRequestException("Unknown column name 
detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+            ColumnDefinition cdef = identifier.prepare(cfm);
 
-            if 
(cfm.getColumnDefinition(identifier.prepare(cfm)).type.isMultiCell())
+            if (cdef.type.isMultiCell())
                 throw new InvalidRequestException(String.format("Cannot use 
MultiCell column '%s' in PRIMARY KEY of materialized view", identifier));
 
             if (cdef.isStatic())
@@ -190,7 +186,7 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
         }
 
         // build the select statement
-        Map<ColumnIdentifier.Raw, Boolean> orderings = Collections.emptyMap();
+        Map<ColumnDefinition.Raw, Boolean> orderings = Collections.emptyMap();
         SelectStatement.Parameters parameters = new 
SelectStatement.Parameters(orderings, false, true, false);
         SelectStatement.RawStatement rawSelect = new 
SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, 
null, null);
 
@@ -226,10 +222,10 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
 
         // This is only used as an intermediate state; this is to catch 
whether multiple non-PK columns are used
         boolean hasNonPKColumn = false;
-        for (ColumnIdentifier.Raw raw : partitionKeys)
+        for (ColumnDefinition.Raw raw : partitionKeys)
             hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, 
hasNonPKColumn, raw, targetPartitionKeys, restrictions);
 
-        for (ColumnIdentifier.Raw raw : clusteringKeys)
+        for (ColumnDefinition.Raw raw : clusteringKeys)
             hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, 
hasNonPKColumn, raw, targetClusteringColumns, restrictions);
 
         // We need to include all of the primary key columns from the base 
table in order to make sure that we do not
@@ -307,25 +303,24 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
     private static boolean getColumnIdentifier(CFMetaData cfm,
                                                Set<ColumnIdentifier> basePK,
                                                boolean hasNonPKColumn,
-                                               ColumnIdentifier.Raw raw,
+                                               ColumnDefinition.Raw raw,
                                                List<ColumnIdentifier> columns,
                                                StatementRestrictions 
restrictions)
     {
-        ColumnIdentifier identifier = raw.prepare(cfm);
-        ColumnDefinition def = cfm.getColumnDefinition(identifier);
+        ColumnDefinition def = raw.prepare(cfm);
 
-        boolean isPk = basePK.contains(identifier);
+        boolean isPk = basePK.contains(def.name);
         if (!isPk && hasNonPKColumn)
-            throw new InvalidRequestException(String.format("Cannot include 
more than one non-primary key column '%s' in materialized view partition key", 
identifier));
+            throw new InvalidRequestException(String.format("Cannot include 
more than one non-primary key column '%s' in materialized view partition key", 
def.name));
 
         // We don't need to include the "IS NOT NULL" filter on a 
non-composite partition key
         // because we will never allow a single partition key to be NULL
-        boolean isSinglePartitionKey = 
cfm.getColumnDefinition(identifier).isPartitionKey()
+        boolean isSinglePartitionKey = def.isPartitionKey()
                                        && cfm.partitionKeyColumns().size() == 
1;
         if (!isSinglePartitionKey && !restrictions.isRestricted(def))
-            throw new InvalidRequestException(String.format("Primary key 
column '%s' is required to be filtered by 'IS NOT NULL'", identifier));
+            throw new InvalidRequestException(String.format("Primary key 
column '%s' is required to be filtered by 'IS NOT NULL'", def.name));
 
-        columns.add(identifier);
+        columns.add(def.name);
         return !isPk;
     }
 }

Reply via email to