http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
new file mode 100644
index 0000000..13e173f
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.FunctionResource;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.*;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.Functions.FunctionsDiff;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+import static java.util.stream.Collectors.toList;
+
+public final class CreateFunctionStatement extends AlterSchemaStatement
+{
+    private final String functionName;
+    private final List<ColumnIdentifier> argumentNames;
+    private final List<CQL3Type.Raw> rawArgumentTypes;
+    private final CQL3Type.Raw rawReturnType;
+    private final boolean calledOnNullInput;
+    private final String language;
+    private final String body;
+    private final boolean orReplace;
+    private final boolean ifNotExists;
+
+    public CreateFunctionStatement(String keyspaceName,
+                                   String functionName,
+                                   List<ColumnIdentifier> argumentNames,
+                                   List<CQL3Type.Raw> rawArgumentTypes,
+                                   CQL3Type.Raw rawReturnType,
+                                   boolean calledOnNullInput,
+                                   String language,
+                                   String body,
+                                   boolean orReplace,
+                                   boolean ifNotExists)
+    {
+        super(keyspaceName);
+        this.functionName = functionName;
+        this.argumentNames = argumentNames;
+        this.rawArgumentTypes = rawArgumentTypes;
+        this.rawReturnType = rawReturnType;
+        this.calledOnNullInput = calledOnNullInput;
+        this.language = language;
+        this.body = body;
+        this.orReplace = orReplace;
+        this.ifNotExists = ifNotExists;
+    }
+
+    // TODO: replace affected aggregates !!
+    public Keyspaces apply(Keyspaces schema)
+    {
+        if (ifNotExists && orReplace)
+            throw ire("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' 
directives");
+
+        UDFunction.assertUdfsEnabled(language);
+
+        if (new HashSet<>(argumentNames).size() != argumentNames.size())
+            throw ire("Duplicate argument names for given function %s with 
argument names %s", functionName, argumentNames);
+
+        rawArgumentTypes.stream()
+                        .filter(CQL3Type.Raw::isFrozen)
+                        .findFirst()
+                        .ifPresent(t -> { throw ire("Argument '%s' cannot be 
frozen; remove frozen<> modifier from '%s'", t, t); });
+
+        if (rawReturnType.isFrozen())
+            throw ire("Return type '%s' cannot be frozen; remove frozen<> 
modifier from '%s'", rawReturnType, rawReturnType);
+
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+            throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+
+        List<AbstractType<?>> argumentTypes =
+            rawArgumentTypes.stream()
+                            .map(t -> t.prepare(keyspaceName, 
keyspace.types).getType())
+                            .collect(toList());
+        AbstractType<?> returnType = rawReturnType.prepare(keyspaceName, 
keyspace.types).getType();
+
+        UDFunction function =
+            UDFunction.create(new FunctionName(keyspaceName, functionName),
+                              argumentNames,
+                              argumentTypes,
+                              returnType,
+                              calledOnNullInput,
+                              language,
+                              body);
+
+        Function existingFunction = keyspace.functions.find(function.name(), 
argumentTypes).orElse(null);
+        if (null != existingFunction)
+        {
+            if (existingFunction.isAggregate())
+                throw ire("Function '%s' cannot replace an aggregate", 
functionName);
+
+            if (ifNotExists)
+                return schema;
+
+            if (!orReplace)
+                throw ire("Function '%s' already exists", functionName);
+
+            if (calledOnNullInput != ((UDFunction) 
existingFunction).isCalledOnNullInput())
+            {
+                throw ire("Function '%s' must have %s directive",
+                          functionName,
+                          calledOnNullInput ? "CALLED ON NULL INPUT" : 
"RETURNS NULL ON NULL INPUT");
+            }
+
+            if (!returnType.isCompatibleWith(existingFunction.returnType()))
+            {
+                throw ire("Cannot replace function '%s', the new return type 
%s is not compatible with the return type %s of existing function",
+                          functionName,
+                          returnType.asCQL3Type(),
+                          existingFunction.returnType().asCQL3Type());
+            }
+
+            // TODO: update dependent aggregates
+        }
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(function)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        assert diff.altered.size() == 1;
+        FunctionsDiff<UDFunction> udfsDiff = diff.altered.get(0).udfs;
+
+        assert udfsDiff.created.size() + udfsDiff.altered.size() == 1;
+        boolean created = !udfsDiff.created.isEmpty();
+
+        return new SchemaChange(created ? Change.CREATED : Change.UPDATED,
+                                Target.FUNCTION,
+                                keyspaceName,
+                                functionName,
+                                
rawArgumentTypes.stream().map(CQL3Type.Raw::toString).collect(toList()));
+    }
+
+    public void authorize(ClientState client)
+    {
+        FunctionName name = new FunctionName(keyspaceName, functionName);
+
+        if (Schema.instance.findFunction(name, 
Lists.transform(rawArgumentTypes, t -> 
t.prepare(keyspaceName).getType())).isPresent() && orReplace)
+            client.ensurePermission(Permission.ALTER, 
FunctionResource.functionFromCql(keyspaceName, functionName, rawArgumentTypes));
+        else
+            client.ensurePermission(Permission.CREATE, 
FunctionResource.keyspace(keyspaceName));
+    }
+
+    @Override
+    Set<IResource> createdResources(KeyspacesDiff diff)
+    {
+        assert diff.altered.size() == 1;
+        FunctionsDiff<UDFunction> udfsDiff = diff.altered.get(0).udfs;
+
+        assert udfsDiff.created.size() + udfsDiff.altered.size() == 1;
+
+        return udfsDiff.created.isEmpty()
+             ? ImmutableSet.of()
+             : ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, 
functionName, rawArgumentTypes));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_FUNCTION, 
keyspaceName, functionName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final FunctionName name;
+        private final List<ColumnIdentifier> argumentNames;
+        private final List<CQL3Type.Raw> rawArgumentTypes;
+        private final CQL3Type.Raw rawReturnType;
+        private final boolean calledOnNullInput;
+        private final String language;
+        private final String body;
+        private final boolean orReplace;
+        private final boolean ifNotExists;
+
+        public Raw(FunctionName name,
+                   List<ColumnIdentifier> argumentNames,
+                   List<CQL3Type.Raw> rawArgumentTypes,
+                   CQL3Type.Raw rawReturnType,
+                   boolean calledOnNullInput,
+                   String language,
+                   String body,
+                   boolean orReplace,
+                   boolean ifNotExists)
+        {
+            this.name = name;
+            this.argumentNames = argumentNames;
+            this.rawArgumentTypes = rawArgumentTypes;
+            this.rawReturnType = rawReturnType;
+            this.calledOnNullInput = calledOnNullInput;
+            this.language = language;
+            this.body = body;
+            this.orReplace = orReplace;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateFunctionStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.keyspace : 
state.getKeyspace();
+
+            return new CreateFunctionStatement(keyspaceName,
+                                               name.name,
+                                               argumentNames,
+                                               rawArgumentTypes,
+                                               rawReturnType,
+                                               calledOnNullInput,
+                                               language,
+                                               body,
+                                               orReplace,
+                                               ifNotExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
new file mode 100644
index 0000000..0065a4c
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.*;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QualifiedName;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget.Type;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Iterables.tryFind;
+
+public final class CreateIndexStatement extends AlterSchemaStatement
+{
+    private final String indexName;
+    private final String tableName;
+    private final List<IndexTarget.Raw> rawIndexTargets;
+    private final IndexAttributes attrs;
+    private final boolean ifNotExists;
+
+    public CreateIndexStatement(String keyspaceName,
+                                String tableName,
+                                String indexName,
+                                List<IndexTarget.Raw> rawIndexTargets,
+                                IndexAttributes attrs,
+                                boolean ifNotExists)
+    {
+        super(keyspaceName);
+        this.tableName = tableName;
+        this.indexName = indexName;
+        this.rawIndexTargets = rawIndexTargets;
+        this.attrs = attrs;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        attrs.validate();
+
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+            throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+
+        TableMetadata table = keyspace.getTableOrViewNullable(tableName);
+        if (null == table)
+            throw ire("Table '%s' doesn't exist", tableName);
+
+        if (null != indexName && keyspace.hasIndex(indexName))
+        {
+            if (ifNotExists)
+                return schema;
+
+            throw ire("Index '%s' already exists", indexName);
+        }
+
+        if (table.isCounter())
+            throw ire("Secondary indexes on counter tables aren't supported");
+
+        if (table.isView())
+            throw ire("Secondary indexes on materialized views aren't 
supported");
+
+        List<IndexTarget> indexTargets = 
Lists.newArrayList(transform(rawIndexTargets, t -> t.prepare(table)));
+
+        if (indexTargets.isEmpty() && !attrs.isCustom)
+            throw ire("Only CUSTOM indexes can be created without specifying a 
target column");
+
+        if (indexTargets.size() > 1)
+        {
+            if (!attrs.isCustom)
+                throw ire("Only CUSTOM indexes support multiple columns");
+
+            Set<ColumnIdentifier> columns = new HashSet<>();
+            for (IndexTarget target : indexTargets)
+                if (!columns.add(target.column))
+                    throw ire("Duplicate column '%s' in index target list", 
target.column);
+        }
+
+        indexTargets.forEach(t -> validateIndexTarget(table, t));
+
+        String name = null == indexName ? generateIndexName(keyspace, 
indexTargets) : indexName;
+
+        IndexMetadata.Kind kind = attrs.isCustom ? IndexMetadata.Kind.CUSTOM : 
IndexMetadata.Kind.COMPOSITES;
+
+        Map<String, String> options = attrs.isCustom ? attrs.getOptions() : 
Collections.emptyMap();
+
+        IndexMetadata index = IndexMetadata.fromIndexTargets(indexTargets, 
name, kind, options);
+
+        // check to disallow creation of an index which duplicates an existing 
one in all but name
+        IndexMetadata equalIndex = tryFind(table.indexes, i -> 
i.equalsWithoutName(index)).orNull();
+        if (null != equalIndex)
+        {
+            if (ifNotExists)
+                return schema;
+
+            throw ire("Index %s is a duplicate of existing index %s", 
index.name, equalIndex.name);
+        }
+
+        TableMetadata newTable = table.withSwapped(table.indexes.with(index));
+        newTable.validate();
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.withSwapped(newTable)));
+    }
+
+    private void validateIndexTarget(TableMetadata table, IndexTarget target)
+    {
+        ColumnMetadata column = table.getColumn(target.column);
+
+        if (null == column)
+            throw ire("Column '%s' doesn't exist", target.column);
+
+        if (column.type.referencesDuration())
+        {
+            if (column.type.isCollection())
+                throw ire("Secondary indexes are not supported on collections 
containing durations");
+
+            if (column.type.isTuple())
+                throw ire("Secondary indexes are not supported on tuples 
containing durations");
+
+            if (column.type.isUDT())
+                throw  ire("Secondary indexes are not supported on UDTs 
containing durations");
+
+            throw ire("Secondary indexes are not supported on duration 
columns");
+        }
+
+        if (column.isPartitionKey() && table.partitionKeyColumns().size() == 1)
+            throw ire("Cannot create secondary index on the only partition key 
column %s", column);
+
+        if (column.type.isFrozenCollection() && target.type != Type.FULL)
+            throw ire("Cannot create %s() index on frozen column %s. Frozen 
collections only support full() indexes", target.type, column);
+
+        if (!column.type.isFrozenCollection() && target.type == Type.FULL)
+            throw ire("full() indexes can only be created on frozen 
collections");
+
+        if (!column.type.isCollection() && target.type != Type.SIMPLE)
+            throw ire("Cannot create %s() index on %s. Non-collection columns 
only support simple indexes", target.type, column);
+
+        if (!(column.type instanceof MapType && column.type.isMultiCell()) && 
(target.type == Type.KEYS || target.type == Type.KEYS_AND_VALUES))
+            throw ire("Cannot create index on %s of column %s with non-map 
type", target.type, column);
+
+        if (column.type.isUDT() && column.type.isMultiCell())
+            throw ire("Cannot create index on non-frozen UDT column %s", 
column);
+    }
+
+    private String generateIndexName(KeyspaceMetadata keyspace, 
List<IndexTarget> targets)
+    {
+        String baseName = targets.size() == 1
+                        ? IndexMetadata.generateDefaultIndexName(tableName, 
targets.get(0).column)
+                        : IndexMetadata.generateDefaultIndexName(tableName);
+        return keyspace.findAvailableIndexName(baseName);
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, 
tableName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureTablePermission(keyspaceName, tableName, 
Permission.ALTER);
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_INDEX, 
keyspaceName, indexName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName tableName;
+        private final QualifiedName indexName;
+        private final List<IndexTarget.Raw> rawIndexTargets;
+        private final IndexAttributes attrs;
+        private final boolean ifNotExists;
+
+        public Raw(QualifiedName tableName,
+                   QualifiedName indexName,
+                   List<IndexTarget.Raw> rawIndexTargets,
+                   IndexAttributes attrs,
+                   boolean ifNotExists)
+        {
+            this.tableName = tableName;
+            this.indexName = indexName;
+            this.rawIndexTargets = rawIndexTargets;
+            this.attrs = attrs;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateIndexStatement prepare(ClientState state)
+        {
+            String keyspaceName = tableName.hasKeyspace()
+                                ? tableName.getKeyspace()
+                                : indexName.hasKeyspace() ? 
indexName.getKeyspace() : state.getKeyspace();
+
+            if (tableName.hasKeyspace() && 
!keyspaceName.equals(tableName.getKeyspace()))
+                throw ire("Keyspace name '%s' doesn't match table name '%s'", 
keyspaceName, tableName);
+
+            if (indexName.hasKeyspace() && 
!keyspaceName.equals(indexName.getKeyspace()))
+                throw ire("Keyspace name '%s' doesn't match index name '%s'", 
keyspaceName, tableName);
+
+            return new CreateIndexStatement(keyspaceName, tableName.getName(), 
indexName.getName(), rawIndexTargets, attrs, ifNotExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
new file mode 100644
index 0000000..ecd19ed
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.*;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams.Option;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+
+public final class CreateKeyspaceStatement extends AlterSchemaStatement
+{
+    private final KeyspaceAttributes attrs;
+    private final boolean ifNotExists;
+
+    public CreateKeyspaceStatement(String keyspaceName, KeyspaceAttributes 
attrs, boolean ifNotExists)
+    {
+        super(keyspaceName);
+        this.attrs = attrs;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        attrs.validate();
+
+        if (!attrs.hasOption(Option.REPLICATION))
+            throw ire("Missing mandatory option '%s'", Option.REPLICATION);
+
+        if (schema.containsKeyspace(keyspaceName))
+        {
+            if (ifNotExists)
+                return schema;
+
+            throw new AlreadyExistsException(keyspaceName);
+        }
+
+        KeyspaceMetadata keyspace = KeyspaceMetadata.create(keyspaceName, 
attrs.asNewKeyspaceParams());
+
+        if (keyspace.params.replication.klass.equals(LocalStrategy.class))
+            throw ire("Unable to use given strategy class: LocalStrategy is 
reserved for internal use.");
+
+        keyspace.params.validate(keyspaceName);
+
+        return schema.withAddedOrUpdated(keyspace);
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.CREATED, keyspaceName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureAllKeyspacesPermission(Permission.CREATE);
+    }
+
+    @Override
+    Set<IResource> createdResources(KeyspacesDiff diff)
+    {
+        return ImmutableSet.of(DataResource.keyspace(keyspaceName), 
FunctionResource.keyspace(keyspaceName));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_KEYSPACE, 
keyspaceName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        public final String keyspaceName;
+        private final KeyspaceAttributes attrs;
+        private final boolean ifNotExists;
+
+        public Raw(String keyspaceName, KeyspaceAttributes attrs, boolean 
ifNotExists)
+        {
+            this.keyspaceName = keyspaceName;
+            this.attrs = attrs;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateKeyspaceStatement prepare(ClientState state)
+        {
+            return new CreateKeyspaceStatement(keyspaceName, attrs, 
ifNotExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
new file mode 100644
index 0000000..ff26f0d
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.*;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+import static java.util.Comparator.comparing;
+
+import static com.google.common.collect.Iterables.concat;
+
+public final class CreateTableStatement extends AlterSchemaStatement
+{
+    private final String tableName;
+
+    private final Map<ColumnIdentifier, CQL3Type.Raw> rawColumns;
+    private final Set<ColumnIdentifier> staticColumns;
+    private final List<ColumnIdentifier> partitionKeyColumns;
+    private final List<ColumnIdentifier> clusteringColumns;
+
+    private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder;
+    private final TableAttributes attrs;
+
+    private final boolean ifNotExists;
+
+    public CreateTableStatement(String keyspaceName,
+                                String tableName,
+
+                                Map<ColumnIdentifier, CQL3Type.Raw> rawColumns,
+                                Set<ColumnIdentifier> staticColumns,
+                                List<ColumnIdentifier> partitionKeyColumns,
+                                List<ColumnIdentifier> clusteringColumns,
+
+                                LinkedHashMap<ColumnIdentifier, Boolean> 
clusteringOrder,
+                                TableAttributes attrs,
+
+                                boolean ifNotExists)
+    {
+        super(keyspaceName);
+        this.tableName = tableName;
+
+        this.rawColumns = rawColumns;
+        this.staticColumns = staticColumns;
+        this.partitionKeyColumns = partitionKeyColumns;
+        this.clusteringColumns = clusteringColumns;
+
+        this.clusteringOrder = clusteringOrder;
+        this.attrs = attrs;
+
+        this.ifNotExists = ifNotExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+            throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+
+        if (keyspace.hasTable(tableName))
+        {
+            if (ifNotExists)
+                return schema;
+
+            throw new AlreadyExistsException(keyspaceName, tableName);
+        }
+
+        TableMetadata table = builder(keyspace.types).build();
+        table.validate();
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.CREATED, Target.TABLE, keyspaceName, 
tableName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureKeyspacePermission(keyspaceName, Permission.CREATE);
+    }
+
+    @Override
+    Set<IResource> createdResources(KeyspacesDiff diff)
+    {
+        return ImmutableSet.of(DataResource.table(keyspaceName, tableName));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_TABLE, 
keyspaceName, tableName);
+    }
+
+    public TableMetadata.Builder builder(Types types)
+    {
+        attrs.validate();
+        TableParams params = attrs.asNewTableParams();
+
+        // use a TreeMap to preserve ordering across JDK versions (see 
CASSANDRA-9492) - important for stable unit tests
+        Map<ColumnIdentifier, CQL3Type> columns = new TreeMap<>(comparing(o -> 
o.bytes));
+        rawColumns.forEach((column, type) -> columns.put(column, 
type.prepare(keyspaceName, types)));
+
+        // check for nested non-frozen UDTs or collections in a non-frozen UDT
+        columns.forEach((column, type) ->
+        {
+            if (type.isUDT() && type.getType().isMultiCell())
+            {
+                ((UserType) type.getType()).fieldTypes().forEach(field ->
+                {
+                    if (field.isMultiCell())
+                        throw ire("Non-frozen UDTs with nested non-frozen 
collections are not supported");
+                });
+            }
+        });
+
+        /*
+         * Deal with PRIMARY KEY columns
+         */
+
+        HashSet<ColumnIdentifier> primaryKeyColumns = new HashSet<>();
+        concat(partitionKeyColumns, clusteringColumns).forEach(column ->
+        {
+            CQL3Type type = columns.get(column);
+            if (null == type)
+                throw ire("Unknown column '%s' referenced in PRIMARY KEY for 
table '%s'", column, tableName);
+
+            if (!primaryKeyColumns.add(column))
+                throw ire("Duplicate column '%s' in PRIMARY KEY clause for 
table '%s'", column, tableName);
+
+            if (type.getType().isMultiCell())
+            {
+                if (type.isCollection())
+                    throw ire("Invalid non-frozen collection type %s for 
PRIMARY KEY column '%s'", type, column);
+                else
+                    throw ire("Invalid non-frozen user-defined type %s for 
PRIMARY KEY column '%s'", type, column);
+            }
+
+            if (type.getType().isCounter())
+                throw ire("counter type is not supported for PRIMARY KEY 
column '%s'", column);
+
+            if (type.getType().referencesDuration())
+                throw ire("duration type is not supported for PRIMARY KEY 
column '%s'", column);
+
+            if (staticColumns.contains(column))
+                throw ire("Static column '%s' cannot be part of the PRIMARY 
KEY", column);
+        });
+
+        List<AbstractType<?>> partitionKeyTypes = new ArrayList<>();
+        List<AbstractType<?>> clusteringTypes = new ArrayList<>();
+
+        partitionKeyColumns.forEach(column ->
+        {
+            CQL3Type type = columns.remove(column);
+            partitionKeyTypes.add(type.getType());
+        });
+
+        clusteringColumns.forEach(column ->
+        {
+            CQL3Type type = columns.remove(column);
+            boolean reverse = !clusteringOrder.getOrDefault(column, true);
+            clusteringTypes.add(reverse ? 
ReversedType.getInstance(type.getType()) : type.getType());
+        });
+
+        // If we give a clustering order, we must explicitly do so for all 
aliases and in the order of the PK
+        // This wasn't previously enforced because of a bug in the 
implementation
+        if (!clusteringOrder.isEmpty() && !clusteringColumns.equals(new 
ArrayList<>(clusteringOrder.keySet())))
+            throw ire("Clustering key columns must exactly match columns in 
CLUSTERING ORDER BY directive");
+
+        // Static columns only make sense if we have at least one clustering 
column. Otherwise everything is static anyway
+        if (clusteringColumns.isEmpty() && !staticColumns.isEmpty())
+            throw ire("Static columns are only useful (and thus allowed) if 
the table has at least one clustering column");
+
+        /*
+         * Counter table validation
+         */
+
+        boolean hasCounters = 
rawColumns.values().stream().anyMatch(CQL3Type.Raw::isCounter);
+        if (hasCounters)
+        {
+            // We've handled anything that is not a PRIMARY KEY so columns 
only contains NON-PK columns. So
+            // if it's a counter table, make sure we don't have non-counter 
types
+            if (columns.values().stream().anyMatch(t -> 
!t.getType().isCounter()))
+                throw ire("Cannot mix counter and non counter columns in the 
same table");
+
+            if (params.defaultTimeToLive > 0)
+                throw ire("Cannot set %s on a table with counters", 
TableParams.Option.DEFAULT_TIME_TO_LIVE);
+        }
+
+        /*
+         * Create the builder
+         */
+
+        TableMetadata.Builder builder = TableMetadata.builder(keyspaceName, 
tableName);
+
+        if (attrs.hasProperty(TableAttributes.ID))
+            builder.id(attrs.getId());
+
+        builder.isCounter(hasCounters)
+               .params(params);
+
+        for (int i = 0; i < partitionKeyColumns.size(); i++)
+            builder.addPartitionKeyColumn(partitionKeyColumns.get(i), 
partitionKeyTypes.get(i));
+
+        for (int i = 0; i < clusteringColumns.size(); i++)
+            builder.addClusteringColumn(clusteringColumns.get(i), 
clusteringTypes.get(i));
+
+        columns.forEach((column, type) ->
+        {
+            if (staticColumns.contains(column))
+                builder.addStaticColumn(column, type.getType());
+            else
+                builder.addRegularColumn(column, type.getType());
+        });
+
+        return builder;
+    }
+
+    public static TableMetadata.Builder parse(String cql, String keyspace)
+    {
+        return CQLFragmentParser.parseAny(CqlParser::createTableStatement, 
cql, "CREATE TABLE")
+                                .keyspace(keyspace)
+                                .prepare(null) // works around a messy 
ClientState/QueryProcessor class init deadlock
+                                .builder(Types.none());
+    }
+
+    public final static class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName name;
+        private final boolean ifNotExists;
+
+        private final Map<ColumnIdentifier, CQL3Type.Raw> rawColumns = new 
HashMap<>();
+        private final Set<ColumnIdentifier> staticColumns = new HashSet<>();
+        private final List<ColumnIdentifier> clusteringColumns = new 
ArrayList<>();
+
+        private List<ColumnIdentifier> partitionKeyColumns;
+
+        private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder 
= new LinkedHashMap<>();
+        public final TableAttributes attrs = new TableAttributes();
+
+        public Raw(QualifiedName name, boolean ifNotExists)
+        {
+            this.name = name;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateTableStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+
+            if (null == partitionKeyColumns)
+                throw ire("No PRIMARY KEY specifed for table '%s' (exactly one 
required)", name);
+
+            return new CreateTableStatement(keyspaceName,
+                                            name.getName(),
+
+                                            rawColumns,
+                                            staticColumns,
+                                            partitionKeyColumns,
+                                            clusteringColumns,
+
+                                            clusteringOrder,
+                                            attrs,
+
+                                            ifNotExists);
+        }
+
+        public String keyspace()
+        {
+            return name.getKeyspace();
+        }
+
+        public Raw keyspace(String keyspace)
+        {
+            name.setKeyspace(keyspace, true);
+            return this;
+        }
+
+        public String table()
+        {
+            return name.getName();
+        }
+
+        public void addColumn(ColumnIdentifier column, CQL3Type.Raw type, 
boolean isStatic)
+        {
+            if (null != rawColumns.put(column, type))
+                throw ire("Duplicate column '%s' declaration for table '%s'", 
column, name);
+
+            if (isStatic)
+                staticColumns.add(column);
+        }
+
+        public void setPartitionKeyColumn(ColumnIdentifier column)
+        {
+            setPartitionKeyColumns(Collections.singletonList(column));
+        }
+
+        public void setPartitionKeyColumns(List<ColumnIdentifier> columns)
+        {
+            if (null != partitionKeyColumns)
+                throw ire("Multiple PRIMARY KEY specified for table '%s' 
(exactly one required)", name);
+
+            partitionKeyColumns = columns;
+        }
+
+        public void markClusteringColumn(ColumnIdentifier column)
+        {
+            clusteringColumns.add(column);
+        }
+
+        public void extendClusteringOrder(ColumnIdentifier column, boolean 
ascending)
+        {
+            if (null != clusteringOrder.put(column, ascending))
+                throw ire("Duplicate column '%s' in CLUSTERING ORDER BY clause 
for table '%s'", column, name);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java
new file mode 100644
index 0000000..cb6d14e
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QualifiedName;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.triggers.TriggerExecutor;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+public final class CreateTriggerStatement extends AlterSchemaStatement
+{
+    private final String tableName;
+    private final String triggerName;
+    private final String triggerClass;
+    private final boolean ifNotExists;
+
+    public CreateTriggerStatement(String keyspaceName, String tableName, 
String triggerName, String triggerClass, boolean ifNotExists)
+    {
+        super(keyspaceName);
+        this.tableName = tableName;
+        this.triggerName = triggerName;
+        this.triggerClass = triggerClass;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+            throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+
+        TableMetadata table = keyspace.getTableOrViewNullable(tableName);
+        if (null == table)
+            throw ire("Table '%s' doesn't exist", tableName);
+
+        if (table.isView())
+            throw ire("Cannot CREATE TRIGGER for a materialized view");
+
+        TriggerMetadata existingTrigger = 
table.triggers.get(triggerName).orElse(null);
+        if (null != existingTrigger)
+        {
+            if (ifNotExists)
+                return schema;
+
+            throw ire("Trigger '%s' already exists", triggerName);
+        }
+
+        try
+        {
+            TriggerExecutor.instance.loadTriggerInstance(triggerClass);
+        }
+        catch (Exception e)
+        {
+            throw ire("Trigger class '%s' couldn't be loaded", triggerClass);
+        }
+
+        TableMetadata newTable = 
table.withSwapped(table.triggers.with(TriggerMetadata.create(triggerName, 
triggerClass)));
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.withSwapped(newTable)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, 
tableName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureIsSuperuser("Only superusers are allowed to perform 
CREATE TRIGGER queries");
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_TRIGGER, 
keyspaceName, triggerName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName tableName;
+        private final String triggerName;
+        private final String triggerClass;
+        private final boolean ifNotExists;
+
+        public Raw(QualifiedName tableName, String triggerName, String 
triggerClass, boolean ifNotExists)
+        {
+            this.tableName = tableName;
+            this.triggerName = triggerName;
+            this.triggerClass = triggerClass;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateTriggerStatement prepare(ClientState state)
+        {
+            String keyspaceName = tableName.hasKeyspace() ? 
tableName.getKeyspace() : state.getKeyspace();
+            return new CreateTriggerStatement(keyspaceName, 
tableName.getName(), triggerName, triggerClass, ifNotExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java
new file mode 100644
index 0000000..c328eb7
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.*;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.UTName;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+import static java.util.stream.Collectors.toList;
+
+public final class CreateTypeStatement extends AlterSchemaStatement
+{
+    private final String typeName;
+    private final List<FieldIdentifier> fieldNames;
+    private final List<CQL3Type.Raw> rawFieldTypes;
+    private final boolean ifNotExists;
+
+    public CreateTypeStatement(String keyspaceName,
+                               String typeName,
+                               List<FieldIdentifier> fieldNames,
+                               List<CQL3Type.Raw> rawFieldTypes,
+                               boolean ifNotExists)
+    {
+        super(keyspaceName);
+        this.typeName = typeName;
+        this.fieldNames = fieldNames;
+        this.rawFieldTypes = rawFieldTypes;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+            throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+
+        UserType existingType = keyspace.types.getNullable(bytes(typeName));
+        if (null != existingType)
+        {
+            if (ifNotExists)
+                return schema;
+
+            throw ire("A user type with name '%s' already exists", typeName);
+        }
+
+        Set<FieldIdentifier> usedNames = new HashSet<>();
+        for (FieldIdentifier name : fieldNames)
+            if (!usedNames.add(name))
+                throw ire("Duplicate field name '%s' in type '%s'", name, 
typeName);
+
+        for (CQL3Type.Raw type : rawFieldTypes)
+        {
+            if (type.isCounter())
+                throw ire("A user type cannot contain counters");
+
+            if (type.isUDT() && !type.isFrozen())
+                throw ire("A user type cannot contain non-frozen UDTs");
+        }
+
+        List<AbstractType<?>> fieldTypes =
+            rawFieldTypes.stream()
+                         .map(t -> t.prepare(keyspaceName, 
keyspace.types).getType())
+                         .collect(toList());
+
+        UserType udt = new UserType(keyspaceName, bytes(typeName), fieldNames, 
fieldTypes, true);
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.types.with(udt)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.CREATED, Target.TYPE, keyspaceName, 
typeName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureKeyspacePermission(keyspaceName, Permission.CREATE);
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_TYPE, 
keyspaceName, typeName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final UTName name;
+        private final boolean ifNotExists;
+
+        private final List<FieldIdentifier> fieldNames = new ArrayList<>();
+        private final List<CQL3Type.Raw> rawFieldTypes = new ArrayList<>();
+
+        public Raw(UTName name, boolean ifNotExists)
+        {
+            this.name = name;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateTypeStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+            return new CreateTypeStatement(keyspaceName, 
name.getStringTypeName(), fieldNames, rawFieldTypes, ifNotExists);
+        }
+
+        public void addField(FieldIdentifier name, CQL3Type.Raw type)
+        {
+            fieldNames.add(name);
+            rawFieldTypes.add(type);
+        }
+
+        public void addToRawBuilder(Types.RawBuilder builder)
+        {
+            builder.add(name.getStringTypeName(),
+                        
fieldNames.stream().map(FieldIdentifier::toString).collect(toList()),
+                        
rawFieldTypes.stream().map(CQL3Type.Raw::toString).collect(toList()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
new file mode 100644
index 0000000..f97b0fe
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.*;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
+import org.apache.cassandra.cql3.selection.RawSelector;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.cql3.statements.StatementType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+import static java.lang.String.join;
+
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.transform;
+
+public final class CreateViewStatement extends AlterSchemaStatement
+{
+    private final String tableName;
+    private final String viewName;
+
+    private final List<RawSelector> rawColumns;
+    private final List<ColumnIdentifier> partitionKeyColumns;
+    private final List<ColumnIdentifier> clusteringColumns;
+
+    private final WhereClause whereClause;
+
+    private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder;
+    private final TableAttributes attrs;
+
+    private final boolean ifNotExists;
+
+    public CreateViewStatement(String keyspaceName,
+                               String tableName,
+                               String viewName,
+
+                               List<RawSelector> rawColumns,
+                               List<ColumnIdentifier> partitionKeyColumns,
+                               List<ColumnIdentifier> clusteringColumns,
+
+                               WhereClause whereClause,
+
+                               LinkedHashMap<ColumnIdentifier, Boolean> 
clusteringOrder,
+                               TableAttributes attrs,
+
+                               boolean ifNotExists)
+    {
+        super(keyspaceName);
+        this.tableName = tableName;
+        this.viewName = viewName;
+
+        this.rawColumns = rawColumns;
+        this.partitionKeyColumns = partitionKeyColumns;
+        this.clusteringColumns = clusteringColumns;
+
+        this.whereClause = whereClause;
+
+        this.clusteringOrder = clusteringOrder;
+        this.attrs = attrs;
+
+        this.ifNotExists = ifNotExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        if (!DatabaseDescriptor.enableMaterializedViews())
+            throw ire("Materialized views are disabled. Enable in 
cassandra.yaml to use.");
+
+        /*
+         * Basic dependency validations
+         */
+
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+            throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+
+        TableMetadata table = keyspace.tables.getNullable(tableName);
+        if (null == table)
+            throw ire("Base table '%s' doesn't exist", tableName);
+
+        if (keyspace.hasTable(viewName))
+            throw ire("Cannot create materialized view '%s' - a table with the 
same name already exists", viewName);
+
+        if (keyspace.hasView(viewName))
+        {
+            if (ifNotExists)
+                return schema;
+
+            throw new AlreadyExistsException(keyspaceName, viewName);
+        }
+
+        /*
+         * Base table validation
+         */
+
+        if (table.isCounter())
+            throw ire("Materialized views are not supported on counter 
tables");
+
+        if (table.isView())
+            throw ire("Materialized views cannot be created against other 
materialized views");
+
+        if (table.params.gcGraceSeconds == 0)
+        {
+            throw ire("Cannot create materialized view '%s' for base table " +
+                      "'%s' with gc_grace_seconds of 0, since this value is " +
+                      "used to TTL undelivered updates. Setting 
gc_grace_seconds" +
+                      " too low might cause undelivered updates to expire " +
+                      "before being replayed.",
+                      viewName, tableName);
+        }
+
+        /*
+         * Process SELECT clause
+         */
+
+        Set<ColumnIdentifier> selectedColumns = new HashSet<>();
+
+        if (rawColumns.isEmpty()) // SELECT *
+            table.columns().forEach(c -> selectedColumns.add(c.name));
+
+        rawColumns.forEach(selector ->
+        {
+            if (null != selector.alias)
+                throw ire("Cannot use aliases when defining a materialized 
view (got %s)", selector);
+
+            if (!(selector.selectable instanceof Selectable.RawIdentifier))
+                throw ire("Can only select columns by name when defining a 
materialized view (got %s)", selector.selectable);
+
+            // will throw IRE if the column doesn't exist in the base table
+            ColumnMetadata column = (ColumnMetadata) 
selector.selectable.prepare(table);
+
+            selectedColumns.add(column.name);
+        });
+
+        selectedColumns.stream()
+                       .map(table::getColumn)
+                       .filter(ColumnMetadata::isStatic)
+                       .findAny()
+                       .ifPresent(c -> { throw ire("Cannot include static 
column '%s' in materialized view '%s'", c, viewName); });
+
+        /*
+         * Process PRIMARY KEY columns and CLUSTERING ORDER BY clause
+         */
+
+        if (partitionKeyColumns.isEmpty())
+            throw ire("Must provide at least one partition key column for 
materialized view '%s'", viewName);
+
+        HashSet<ColumnIdentifier> primaryKeyColumns = new HashSet<>();
+
+        concat(partitionKeyColumns, clusteringColumns).forEach(name ->
+        {
+            ColumnMetadata column = table.getColumn(name);
+            if (null == column || !selectedColumns.contains(name))
+                throw ire("Unknown column '%s' referenced in PRIMARY KEY for 
materialized view '%s'", name, viewName);
+
+            if (!primaryKeyColumns.add(name))
+                throw ire("Duplicate column '%s' in PRIMARY KEY clause for 
materialized view '%s'", name, viewName);
+
+            AbstractType<?> type = column.type;
+
+            if (type.isMultiCell())
+            {
+                if (type.isCollection())
+                    throw ire("Invalid non-frozen collection type '%s' for 
PRIMARY KEY column '%s'", type, name);
+                else
+                    throw ire("Invalid non-frozen user-defined type '%s' for 
PRIMARY KEY column '%s'", type, name);
+            }
+
+            if (type.isCounter())
+                throw ire("counter type is not supported for PRIMARY KEY 
column '%s'", name);
+
+            if (type.referencesDuration())
+                throw ire("duration type is not supported for PRIMARY KEY 
column '%s'", name);
+        });
+
+        // If we give a clustering order, we must explicitly do so for all 
aliases and in the order of the PK
+        if (!clusteringOrder.isEmpty() && !clusteringColumns.equals(new 
ArrayList<>(clusteringOrder.keySet())))
+            throw ire("Clustering key columns must exactly match columns in 
CLUSTERING ORDER BY directive");
+
+        /*
+         * We need to include all of the primary key columns from the base 
table in order to make sure that we do not
+         * overwrite values in the view. We cannot support "collapsing" the 
base table into a smaller number of rows in
+         * the view because if we need to generate a tombstone, we have no way 
of knowing which value is currently being
+         * used in the view and whether or not to generate a tombstone. In 
order to not surprise our users, we require
+         * that they include all of the columns. We provide them with a list 
of all of the columns left to include.
+         */
+        List<ColumnIdentifier> missingPrimaryKeyColumns =
+            Lists.newArrayList(filter(transform(table.primaryKeyColumns(), c 
-> c.name), c -> !primaryKeyColumns.contains(c)));
+
+        if (!missingPrimaryKeyColumns.isEmpty())
+        {
+            throw ire("Cannot create materialized view '%s' without primary 
key columns %s from base table '%s'",
+                      viewName, join(", ", transform(missingPrimaryKeyColumns, 
ColumnIdentifier::toString)), tableName);
+        }
+
+        Set<ColumnIdentifier> regularBaseTableColumnsInViewPrimaryKey = new 
HashSet<>(primaryKeyColumns);
+        transform(table.primaryKeyColumns(), c -> 
c.name).forEach(regularBaseTableColumnsInViewPrimaryKey::remove);
+        if (regularBaseTableColumnsInViewPrimaryKey.size() > 1)
+        {
+            throw ire("Cannot include more than one non-primary key column in 
materialized view primary key (got %s)",
+                      join(", ", 
transform(regularBaseTableColumnsInViewPrimaryKey, 
ColumnIdentifier::toString)));
+        }
+
+        /*
+         * Process WHERE clause
+         */
+
+        if (whereClause.containsCustomExpressions())
+            throw ire("WHERE clause for materialized view '%s' cannot contain 
custom index expressions", viewName);
+
+        StatementRestrictions restrictions =
+            new StatementRestrictions(StatementType.SELECT,
+                                      table,
+                                      whereClause,
+                                      VariableSpecifications.empty(),
+                                      false,
+                                      false,
+                                      true,
+                                      true);
+
+        List<ColumnIdentifier> nonRestrictedPrimaryKeyColumns =
+            Lists.newArrayList(filter(primaryKeyColumns, name -> 
!restrictions.isRestricted(table.getColumn(name))));
+
+        if (!nonRestrictedPrimaryKeyColumns.isEmpty())
+        {
+            throw ire("Primary key columns %s must be restricted with 'IS NOT 
NULL' or otherwise",
+                      join(", ", transform(nonRestrictedPrimaryKeyColumns, 
ColumnIdentifier::toString)));
+        }
+
+        // See CASSANDRA-13798
+        Set<ColumnMetadata> restrictedNonPrimaryKeyColumns = 
restrictions.nonPKRestrictedColumns(false);
+        if (!restrictedNonPrimaryKeyColumns.isEmpty() && 
!Boolean.getBoolean("cassandra.mv.allow_filtering_nonkey_columns_unsafe"))
+        {
+            throw ire("Non-primary key columns can only be restricted with 'IS 
NOT NULL' (got: %s restricted illegally)",
+                      join(",", transform(restrictedNonPrimaryKeyColumns, 
ColumnMetadata::toString)));
+        }
+
+        /*
+         * Validate WITH params
+         */
+
+        attrs.validate();
+
+        if (attrs.hasOption(TableParams.Option.DEFAULT_TIME_TO_LIVE))
+        {
+            throw ire("Cannot set default_time_to_live for a materialized 
view. " +
+                      "Data in a materialized view always expire at the same 
time than " +
+                      "the corresponding data in the parent table.");
+        }
+
+        /*
+         * Build the thing
+         */
+
+        TableMetadata.Builder builder = TableMetadata.builder(keyspaceName, 
viewName);
+
+        if (attrs.hasProperty(TableAttributes.ID))
+            builder.id(attrs.getId());
+
+        builder.params(attrs.asNewTableParams())
+               .kind(TableMetadata.Kind.VIEW);
+
+        partitionKeyColumns.forEach(name -> 
builder.addPartitionKeyColumn(name, getType(table, name)));
+        clusteringColumns.forEach(name -> builder.addClusteringColumn(name, 
getType(table, name)));
+
+        selectedColumns.stream()
+                       .filter(name -> !primaryKeyColumns.contains(name))
+                       .forEach(name -> builder.addRegularColumn(name, 
getType(table, name)));
+
+        ViewMetadata view = new ViewMetadata(table.id, table.name, 
rawColumns.isEmpty(), whereClause, builder.build());
+        view.metadata.validate();
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.views.with(view)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.CREATED, Target.TABLE, keyspaceName, 
viewName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureTablePermission(keyspaceName, tableName, 
Permission.ALTER);
+    }
+
+    private AbstractType<?> getType(TableMetadata table, ColumnIdentifier name)
+    {
+        AbstractType<?> type = table.getColumn(name).type;
+        boolean reverse = !clusteringOrder.getOrDefault(name, true);
+
+        if (type.isReversed() && !reverse)
+            return ((ReversedType) type).baseType;
+        else if (!type.isReversed() && reverse)
+            return ReversedType.getInstance(type);
+        else
+            return type;
+    }
+
+    @Override
+    Set<String> clientWarnings(KeyspacesDiff diff)
+    {
+        return ImmutableSet.of("Materialized views are experimental and are 
not recommended for production use.");
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_VIEW, 
keyspaceName, viewName);
+    }
+
+    public final static class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName tableName;
+        private final QualifiedName viewName;
+        private final boolean ifNotExists;
+
+        private final List<RawSelector> rawColumns;
+        private final List<ColumnIdentifier> clusteringColumns = new 
ArrayList<>();
+        private List<ColumnIdentifier> partitionKeyColumns;
+
+        private final WhereClause whereClause;
+
+        private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder 
= new LinkedHashMap<>();
+        public final TableAttributes attrs = new TableAttributes();
+
+        public Raw(QualifiedName tableName, QualifiedName viewName, 
List<RawSelector> rawColumns, WhereClause whereClause, boolean ifNotExists)
+        {
+            this.tableName = tableName;
+            this.viewName = viewName;
+            this.rawColumns = rawColumns;
+            this.whereClause = whereClause;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateViewStatement prepare(ClientState state)
+        {
+            String keyspaceName = viewName.hasKeyspace() ? 
viewName.getKeyspace() : state.getKeyspace();
+
+            if (tableName.hasKeyspace() && 
!keyspaceName.equals(tableName.getKeyspace()))
+                throw ire("Cannot create a materialized view on a table in a 
different keyspace");
+
+            if (!bindVariables.isEmpty())
+                throw ire("Bind variables are not allowed in CREATE 
MATERIALIZED VIEW statements");
+
+            if (null == partitionKeyColumns)
+                throw ire("No PRIMARY KEY specifed for view '%s' (exactly one 
required)", viewName);
+
+            return new CreateViewStatement(keyspaceName,
+                                           tableName.getName(),
+                                           viewName.getName(),
+
+                                           rawColumns,
+                                           partitionKeyColumns,
+                                           clusteringColumns,
+
+                                           whereClause,
+
+                                           clusteringOrder,
+                                           attrs,
+
+                                           ifNotExists);
+        }
+
+        public void setPartitionKeyColumns(List<ColumnIdentifier> columns)
+        {
+            partitionKeyColumns = columns;
+        }
+
+        public void markClusteringColumn(ColumnIdentifier column)
+        {
+            clusteringColumns.add(column);
+        }
+
+        public void extendClusteringOrder(ColumnIdentifier column, boolean 
ascending)
+        {
+            if (null != clusteringOrder.put(column, ascending))
+                throw ire("Duplicate column '%s' in CLUSTERING ORDER BY clause 
for view '%s'", column, viewName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
new file mode 100644
index 0000000..564f267
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.FunctionResource;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+
+import static java.lang.String.format;
+import static java.lang.String.join;
+import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.Iterables.transform;
+
+public final class DropAggregateStatement extends AlterSchemaStatement
+{
+    private final String aggregateName;
+    private final List<CQL3Type.Raw> arguments;
+    private final boolean argumentsSpeficied;
+    private final boolean ifExists;
+
+    public DropAggregateStatement(String keyspaceName,
+                                  String aggregateName,
+                                  List<CQL3Type.Raw> arguments,
+                                  boolean argumentsSpeficied,
+                                  boolean ifExists)
+    {
+        super(keyspaceName);
+        this.aggregateName = aggregateName;
+        this.arguments = arguments;
+        this.argumentsSpeficied = argumentsSpeficied;
+        this.ifExists = ifExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        String name =
+            argumentsSpeficied
+          ? format("%s.%s(%s)", keyspaceName, aggregateName, join(", ", 
transform(arguments, CQL3Type.Raw::toString)))
+          : format("%s.%s", keyspaceName, aggregateName);
+
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Aggregate '%s' doesn't exist", name);
+        }
+
+        Collection<Function> aggregates = keyspace.functions.get(new 
FunctionName(keyspaceName, aggregateName));
+        if (aggregates.size() > 1 && !argumentsSpeficied)
+        {
+            throw ire("'DROP AGGREGATE %s' matches multiple function 
definitions; " +
+                      "specify the argument types by issuing a statement like 
" +
+                      "'DROP AGGREGATE %s (type, type, ...)'. You can use 
cqlsh " +
+                      "'DESCRIBE AGGREGATE %s' command to find all overloads",
+                      aggregateName, aggregateName, aggregateName);
+        }
+
+        arguments.stream()
+                 .filter(CQL3Type.Raw::isFrozen)
+                 .findFirst()
+                 .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; 
remove frozen<> modifier from '%s'", t, t); });
+
+        List<AbstractType<?>> argumentTypes = 
prepareArgumentTypes(keyspace.types);
+
+        Predicate<Function> filter = Functions.Filter.UDA;
+        if (argumentsSpeficied)
+            filter = filter.and(f -> Functions.typesMatch(f.argTypes(), 
argumentTypes));
+
+        Function aggregate = 
aggregates.stream().filter(filter).findAny().orElse(null);
+        if (null == aggregate)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Aggregate '%s' doesn't exist", name);
+        }
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.without(aggregate)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        Functions dropped = diff.altered.get(0).udas.dropped;
+        assert dropped.size() == 1;
+        return SchemaChange.forAggregate(Change.DROPPED, (UDAggregate) 
dropped.iterator().next());
+    }
+
+    public void authorize(ClientState client)
+    {
+        KeyspaceMetadata keyspace = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
+        if (null == keyspace)
+            return;
+
+        Stream<Function> functions = keyspace.functions.get(new 
FunctionName(keyspaceName, aggregateName)).stream();
+        if (argumentsSpeficied)
+            functions = functions.filter(f -> 
Functions.typesMatch(f.argTypes(), prepareArgumentTypes(keyspace.types)));
+
+        functions.forEach(f -> client.ensurePermission(Permission.DROP, 
FunctionResource.function(f)));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_AGGREGATE, 
keyspaceName, aggregateName);
+    }
+
+    private List<AbstractType<?>> prepareArgumentTypes(Types types)
+    {
+        return arguments.stream()
+                        .map(t -> t.prepare(keyspaceName, types))
+                        .map(CQL3Type::getType)
+                        .collect(toList());
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final FunctionName name;
+        private final List<CQL3Type.Raw> arguments;
+        private final boolean argumentsSpecified;
+        private final boolean ifExists;
+
+        public Raw(FunctionName name,
+                   List<CQL3Type.Raw> arguments,
+                   boolean argumentsSpecified,
+                   boolean ifExists)
+        {
+            this.name = name;
+            this.arguments = arguments;
+            this.argumentsSpecified = argumentsSpecified;
+            this.ifExists = ifExists;
+        }
+
+        public DropAggregateStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.keyspace : 
state.getKeyspace();
+            return new DropAggregateStatement(keyspaceName, name.name, 
arguments, argumentsSpecified, ifExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
new file mode 100644
index 0000000..9433833
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.FunctionResource;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+
+import static java.lang.String.format;
+import static java.lang.String.join;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.Iterables.transform;
+
+public final class DropFunctionStatement extends AlterSchemaStatement
+{
+    private final String functionName;
+    private final Collection<CQL3Type.Raw> arguments;
+    private final boolean argumentsSpeficied;
+    private final boolean ifExists;
+
+    public DropFunctionStatement(String keyspaceName,
+                                 String functionName,
+                                 Collection<CQL3Type.Raw> arguments,
+                                 boolean argumentsSpeficied,
+                                 boolean ifExists)
+    {
+        super(keyspaceName);
+        this.functionName = functionName;
+        this.arguments = arguments;
+        this.argumentsSpeficied = argumentsSpeficied;
+        this.ifExists = ifExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        String name =
+            argumentsSpeficied
+          ? format("%s.%s(%s)", keyspaceName, functionName, join(", ", 
transform(arguments, CQL3Type.Raw::toString)))
+          : format("%s.%s", keyspaceName, functionName);
+
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Function '%s' doesn't exist", name);
+        }
+
+        Collection<Function> functions = keyspace.functions.get(new 
FunctionName(keyspaceName, functionName));
+        if (functions.size() > 1 && !argumentsSpeficied)
+        {
+            throw ire("'DROP FUNCTION %s' matches multiple function 
definitions; " +
+                      "specify the argument types by issuing a statement like 
" +
+                      "'DROP FUNCTION %s (type, type, ...)'. You can use cqlsh 
" +
+                      "'DESCRIBE FUNCTION %s' command to find all overloads",
+                      functionName, functionName, functionName);
+        }
+
+        arguments.stream()
+                 .filter(CQL3Type.Raw::isFrozen)
+                 .findFirst()
+                 .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; 
remove frozen<> modifier from '%s'", t, t); });
+
+        List<AbstractType<?>> argumentTypes = 
prepareArgumentTypes(keyspace.types);
+
+        Predicate<Function> filter = Functions.Filter.UDF;
+        if (argumentsSpeficied)
+            filter = filter.and(f -> Functions.typesMatch(f.argTypes(), 
argumentTypes));
+
+        Function function = 
functions.stream().filter(filter).findAny().orElse(null);
+        if (null == function)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Function '%s' doesn't exist", name);
+        }
+
+        String dependentAggregates =
+            keyspace.functions
+                    .aggregatesUsingFunction(function)
+                    .map(a -> a.name().toString())
+                    .collect(joining(", "));
+
+        if (!dependentAggregates.isEmpty())
+            throw ire("Function '%s' is still referenced by aggregates %s", 
name, dependentAggregates);
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.without(function)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        Functions dropped = diff.altered.get(0).udfs.dropped;
+        assert dropped.size() == 1;
+        return SchemaChange.forFunction(Change.DROPPED, (UDFunction) 
dropped.iterator().next());
+    }
+
+    public void authorize(ClientState client)
+    {
+        KeyspaceMetadata keyspace = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
+        if (null == keyspace)
+            return;
+
+        Stream<Function> functions = keyspace.functions.get(new 
FunctionName(keyspaceName, functionName)).stream();
+        if (argumentsSpeficied)
+            functions = functions.filter(f -> 
Functions.typesMatch(f.argTypes(), prepareArgumentTypes(keyspace.types)));
+
+        functions.forEach(f -> client.ensurePermission(Permission.DROP, 
FunctionResource.function(f)));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_FUNCTION, 
keyspaceName, functionName);
+    }
+
+    private List<AbstractType<?>> prepareArgumentTypes(Types types)
+    {
+        return arguments.stream()
+                        .map(t -> t.prepare(keyspaceName, types))
+                        .map(CQL3Type::getType)
+                        .collect(toList());
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final FunctionName name;
+        private final List<CQL3Type.Raw> arguments;
+        private final boolean argumentsSpecified;
+        private final boolean ifExists;
+
+        public Raw(FunctionName name,
+                   List<CQL3Type.Raw> arguments,
+                   boolean argumentsSpecified,
+                   boolean ifExists)
+        {
+            this.name = name;
+            this.arguments = arguments;
+            this.argumentsSpecified = argumentsSpecified;
+            this.ifExists = ifExists;
+        }
+
+        public DropFunctionStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.keyspace : 
state.getKeyspace();
+            return new DropFunctionStatement(keyspaceName, name.name, 
arguments, argumentsSpecified, ifExists);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to