http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java 
b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
deleted file mode 100644
index 572362b..0000000
--- a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3.statements;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.schema.*;
-import org.apache.cassandra.schema.TableParams.Option;
-import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
-
-import static java.lang.String.format;
-
-public final class TableAttributes extends PropertyDefinitions
-{
-    private static final String KW_ID = "id";
-    private static final Set<String> validKeywords;
-    private static final Set<String> obsoleteKeywords;
-
-    static
-    {
-        ImmutableSet.Builder<String> validBuilder = ImmutableSet.builder();
-        for (Option option : Option.values())
-            validBuilder.add(option.toString());
-        validBuilder.add(KW_ID);
-        validKeywords = validBuilder.build();
-        obsoleteKeywords = ImmutableSet.of();
-    }
-
-    public void validate()
-    {
-        validate(validKeywords, obsoleteKeywords);
-        build(TableParams.builder()).validate();
-    }
-
-    public TableParams asNewTableParams()
-    {
-        return build(TableParams.builder());
-    }
-
-    public TableParams asAlteredTableParams(TableParams previous)
-    {
-        if (getId() != null)
-            throw new ConfigurationException("Cannot alter table id.");
-        return build(previous.unbuild());
-    }
-
-    public TableId getId() throws ConfigurationException
-    {
-        String id = getSimple(KW_ID);
-        try
-        {
-            return id != null ? TableId.fromString(id) : null;
-        }
-        catch (IllegalArgumentException e)
-        {
-            throw new ConfigurationException("Invalid table id", e);
-        }
-    }
-
-    private TableParams build(TableParams.Builder builder)
-    {
-        if (hasOption(Option.BLOOM_FILTER_FP_CHANCE))
-            
builder.bloomFilterFpChance(getDouble(Option.BLOOM_FILTER_FP_CHANCE));
-
-        if (hasOption(Option.CACHING))
-            builder.caching(CachingParams.fromMap(getMap(Option.CACHING)));
-
-        if (hasOption(Option.COMMENT))
-            builder.comment(getString(Option.COMMENT));
-
-        if (hasOption(Option.COMPACTION))
-            
builder.compaction(CompactionParams.fromMap(getMap(Option.COMPACTION)));
-
-        if (hasOption(Option.COMPRESSION))
-        {
-            //crc_check_chance was "promoted" from a compression property to a 
top-level-property after #9839
-            //so we temporarily accept it to be defined as a compression 
option, to maintain backwards compatibility
-            Map<String, String> compressionOpts = getMap(Option.COMPRESSION);
-            if 
(compressionOpts.containsKey(Option.CRC_CHECK_CHANCE.toString().toLowerCase()))
-            {
-                Double crcCheckChance = 
getDeprecatedCrcCheckChance(compressionOpts);
-                builder.crcCheckChance(crcCheckChance);
-            }
-            
builder.compression(CompressionParams.fromMap(getMap(Option.COMPRESSION)));
-        }
-
-        if (hasOption(Option.DEFAULT_TIME_TO_LIVE))
-            builder.defaultTimeToLive(getInt(Option.DEFAULT_TIME_TO_LIVE));
-
-        if (hasOption(Option.GC_GRACE_SECONDS))
-            builder.gcGraceSeconds(getInt(Option.GC_GRACE_SECONDS));
-
-        if (hasOption(Option.MAX_INDEX_INTERVAL))
-            builder.maxIndexInterval(getInt(Option.MAX_INDEX_INTERVAL));
-
-        if (hasOption(Option.MEMTABLE_FLUSH_PERIOD_IN_MS))
-            
builder.memtableFlushPeriodInMs(getInt(Option.MEMTABLE_FLUSH_PERIOD_IN_MS));
-
-        if (hasOption(Option.MIN_INDEX_INTERVAL))
-            builder.minIndexInterval(getInt(Option.MIN_INDEX_INTERVAL));
-
-        if (hasOption(Option.SPECULATIVE_RETRY))
-            
builder.speculativeRetry(SpeculativeRetryPolicy.fromString(getString(Option.SPECULATIVE_RETRY)));
-
-        if (hasOption(Option.CRC_CHECK_CHANCE))
-            builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE));
-
-        if (hasOption(Option.CDC))
-            builder.cdc(getBoolean(Option.CDC.toString(), false));
-
-        return builder.build();
-    }
-
-    private Double getDeprecatedCrcCheckChance(Map<String, String> 
compressionOpts)
-    {
-        String value = 
compressionOpts.get(Option.CRC_CHECK_CHANCE.toString().toLowerCase());
-        try
-        {
-            return Double.valueOf(value);
-        }
-        catch (NumberFormatException e)
-        {
-            throw new SyntaxException(String.format("Invalid double value %s 
for crc_check_chance.'", value));
-        }
-    }
-
-    private double getDouble(Option option)
-    {
-        String value = getString(option);
-
-        try
-        {
-            return Double.parseDouble(value);
-        }
-        catch (NumberFormatException e)
-        {
-            throw new SyntaxException(format("Invalid double value %s for 
'%s'", value, option));
-        }
-    }
-
-    private int getInt(Option option)
-    {
-        String value = getString(option);
-
-        try
-        {
-            return Integer.parseInt(value);
-        }
-        catch (NumberFormatException e)
-        {
-            throw new SyntaxException(String.format("Invalid integer value %s 
for '%s'", value, option));
-        }
-    }
-
-    private String getString(Option option)
-    {
-        String value = getSimple(option.toString());
-        if (value == null)
-            throw new IllegalStateException(format("Option '%s' is absent", 
option));
-        return value;
-    }
-
-    private Map<String, String> getMap(Option option)
-    {
-        Map<String, String> value = getMap(option.toString());
-        if (value == null)
-            throw new IllegalStateException(format("Option '%s' is absent", 
option));
-        return value;
-    }
-
-    private boolean hasOption(Option option)
-    {
-        return hasProperty(option.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index d41a814..206d116 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
@@ -34,45 +35,40 @@ import 
org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
-public class TruncateStatement extends CFStatement implements CQLStatement
+public class TruncateStatement extends QualifiedStatement implements 
CQLStatement
 {
-    public TruncateStatement(CFName name)
+    public TruncateStatement(QualifiedName name)
     {
         super(name);
     }
 
-    public int getBoundTerms()
+    public TruncateStatement prepare(ClientState state)
     {
-        return 0;
+        return this;
     }
 
-    public Prepared prepare() throws InvalidRequestException
+    public void authorize(ClientState state) throws InvalidRequestException, 
UnauthorizedException
     {
-        return new Prepared(this);
-    }
-
-    public void checkAccess(ClientState state) throws InvalidRequestException, 
UnauthorizedException
-    {
-        state.hasColumnFamilyAccess(keyspace(), columnFamily(), 
Permission.MODIFY);
+        state.ensureTablePermission(keyspace(), name(), Permission.MODIFY);
     }
 
     public void validate(ClientState state) throws InvalidRequestException
     {
-        Schema.instance.validateTable(keyspace(), columnFamily());
+        Schema.instance.validateTable(keyspace(), name());
     }
 
     public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime) throws InvalidRequestException, TruncateException
     {
         try
         {
-            TableMetadata metaData = 
Schema.instance.getTableMetadata(keyspace(), columnFamily());
+            TableMetadata metaData = 
Schema.instance.getTableMetadata(keyspace(), name());
             if (metaData.isView())
                 throw new InvalidRequestException("Cannot TRUNCATE 
materialized view directly; must truncate base table instead");
 
             if (metaData.isVirtual())
                 throw new InvalidRequestException("Cannot truncate virtual 
tables");
 
-            StorageProxy.truncateBlocking(keyspace(), columnFamily());
+            StorageProxy.truncateBlocking(keyspace(), name());
         }
         catch (UnavailableException | TimeoutException e)
         {
@@ -81,18 +77,18 @@ public class TruncateStatement extends CFStatement 
implements CQLStatement
         return null;
     }
 
-    public ResultMessage executeInternal(QueryState state, QueryOptions 
options)
+    public ResultMessage executeLocally(QueryState state, QueryOptions options)
     {
         try
         {
-            TableMetadata metaData = 
Schema.instance.getTableMetadata(keyspace(), columnFamily());
+            TableMetadata metaData = 
Schema.instance.getTableMetadata(keyspace(), name());
             if (metaData.isView())
                 throw new InvalidRequestException("Cannot TRUNCATE 
materialized view directly; must truncate base table instead");
 
             if (metaData.isVirtual())
                 throw new InvalidRequestException("Cannot truncate virtual 
tables");
 
-            ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
+            ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(name());
             cfs.truncateBlocking();
         }
         catch (Exception e)
@@ -111,6 +107,6 @@ public class TruncateStatement extends CFStatement 
implements CQLStatement
     @Override
     public AuditLogContext getAuditLogContext()
     {
-        return new AuditLogContext(AuditLogEntryType.TRUNCATE, keyspace(), 
cfName.getColumnFamily());
+        return new AuditLogContext(AuditLogEntryType.TRUNCATE, keyspace(), 
name());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 66addab..21323d2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
@@ -50,19 +51,14 @@ public class UpdateStatement extends ModificationStatement
     private static final Constants.Value EMPTY = new 
Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER);
 
     private UpdateStatement(StatementType type,
-                            int boundTerms,
+                            VariableSpecifications bindVariables,
                             TableMetadata metadata,
                             Operations operations,
                             StatementRestrictions restrictions,
                             Conditions conditions,
                             Attributes attrs)
     {
-        super(type, boundTerms, metadata, operations, restrictions, 
conditions, attrs);
-    }
-
-    public boolean requireFullClusteringKey()
-    {
-        return true;
+        super(type, bindVariables, metadata, operations, restrictions, 
conditions, attrs);
     }
 
     @Override
@@ -127,7 +123,7 @@ public class UpdateStatement extends ModificationStatement
          * @param columnValues list of column values (corresponds to names)
          * @param ifNotExists true if an IF NOT EXISTS condition was 
specified, false otherwise
          */
-        public ParsedInsert(CFName name,
+        public ParsedInsert(QualifiedName name,
                             Attributes.Raw attrs,
                             List<ColumnMetadata.Raw> columnNames,
                             List<Term.Raw> columnValues,
@@ -140,7 +136,7 @@ public class UpdateStatement extends ModificationStatement
 
         @Override
         protected ModificationStatement prepareInternal(TableMetadata metadata,
-                                                        VariableSpecifications 
boundNames,
+                                                        VariableSpecifications 
bindVariables,
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
@@ -173,7 +169,7 @@ public class UpdateStatement extends ModificationStatement
                 else
                 {
                     Operation operation = new 
Operation.SetValue(value).prepare(metadata, def);
-                    operation.collectMarkerSpecification(boundNames);
+                    operation.collectMarkerSpecification(bindVariables);
                     operations.add(operation);
                 }
             }
@@ -183,13 +179,13 @@ public class UpdateStatement extends ModificationStatement
             StatementRestrictions restrictions = new 
StatementRestrictions(type,
                                                                            
metadata,
                                                                            
whereClause.build(),
-                                                                           
boundNames,
+                                                                           
bindVariables,
                                                                            
applyOnlyToStaticColumns,
                                                                            
false,
                                                                            
false);
 
             return new UpdateStatement(type,
-                                       boundNames.size(),
+                                       bindVariables,
                                        metadata,
                                        operations,
                                        restrictions,
@@ -206,7 +202,7 @@ public class UpdateStatement extends ModificationStatement
         private final Json.Raw jsonValue;
         private final boolean defaultUnset;
 
-        public ParsedInsertJson(CFName name, Attributes.Raw attrs, Json.Raw 
jsonValue, boolean defaultUnset, boolean ifNotExists)
+        public ParsedInsertJson(QualifiedName name, Attributes.Raw attrs, 
Json.Raw jsonValue, boolean defaultUnset, boolean ifNotExists)
         {
             super(name, StatementType.INSERT, attrs, null, ifNotExists, false);
             this.jsonValue = jsonValue;
@@ -215,14 +211,14 @@ public class UpdateStatement extends ModificationStatement
 
         @Override
         protected ModificationStatement prepareInternal(TableMetadata metadata,
-                                                        VariableSpecifications 
boundNames,
+                                                        VariableSpecifications 
bindVariables,
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
             checkFalse(metadata.isCounter(), "INSERT statements are not 
allowed on counter tables, use UPDATE instead");
 
             Collection<ColumnMetadata> defs = metadata.columns();
-            Json.Prepared prepared = 
jsonValue.prepareAndCollectMarkers(metadata, defs, boundNames);
+            Json.Prepared prepared = 
jsonValue.prepareAndCollectMarkers(metadata, defs, bindVariables);
 
             WhereClause.Builder whereClause = new WhereClause.Builder();
             Operations operations = new Operations(type);
@@ -241,7 +237,7 @@ public class UpdateStatement extends ModificationStatement
                 else
                 {
                     Operation operation = new 
Operation.SetValue(raw).prepare(metadata, def);
-                    operation.collectMarkerSpecification(boundNames);
+                    operation.collectMarkerSpecification(bindVariables);
                     operations.add(operation);
                 }
             }
@@ -251,13 +247,13 @@ public class UpdateStatement extends ModificationStatement
             StatementRestrictions restrictions = new 
StatementRestrictions(type,
                                                                            
metadata,
                                                                            
whereClause.build(),
-                                                                           
boundNames,
+                                                                           
bindVariables,
                                                                            
applyOnlyToStaticColumns,
                                                                            
false,
                                                                            
false);
 
             return new UpdateStatement(type,
-                                       boundNames.size(),
+                                       bindVariables,
                                        metadata,
                                        operations,
                                        restrictions,
@@ -282,7 +278,7 @@ public class UpdateStatement extends ModificationStatement
          * @param whereClause the where clause
          * @param ifExists flag to check if row exists
          * */
-        public ParsedUpdate(CFName name,
+        public ParsedUpdate(QualifiedName name,
                             Attributes.Raw attrs,
                             List<Pair<ColumnMetadata.Raw, 
Operation.RawUpdate>> updates,
                             WhereClause whereClause,
@@ -296,7 +292,7 @@ public class UpdateStatement extends ModificationStatement
 
         @Override
         protected ModificationStatement prepareInternal(TableMetadata metadata,
-                                                        VariableSpecifications 
boundNames,
+                                                        VariableSpecifications 
bindVariables,
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
@@ -309,18 +305,18 @@ public class UpdateStatement extends ModificationStatement
                 checkFalse(def.isPrimaryKeyColumn(), "PRIMARY KEY part %s 
found in SET part", def.name);
 
                 Operation operation = entry.right.prepare(metadata, def);
-                operation.collectMarkerSpecification(boundNames);
+                operation.collectMarkerSpecification(bindVariables);
                 operations.add(operation);
             }
 
             StatementRestrictions restrictions = newRestrictions(metadata,
-                                                                 boundNames,
+                                                                 bindVariables,
                                                                  operations,
                                                                  whereClause,
                                                                  conditions);
 
             return new UpdateStatement(type,
-                                       boundNames.size(),
+                                       bindVariables,
                                        metadata,
                                        operations,
                                        restrictions,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 381ed3a..3013d9f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -28,7 +29,7 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
-public class UseStatement extends ParsedStatement implements CQLStatement
+public class UseStatement extends CQLStatement.Raw implements CQLStatement
 {
     private final String keyspace;
 
@@ -37,17 +38,12 @@ public class UseStatement extends ParsedStatement 
implements CQLStatement
         this.keyspace = keyspace;
     }
 
-    public int getBoundTerms()
+    public UseStatement prepare(ClientState state)
     {
-        return 0;
+        return this;
     }
 
-    public Prepared prepare() throws InvalidRequestException
-    {
-        return new Prepared(this);
-    }
-
-    public void checkAccess(ClientState state) throws UnauthorizedException
+    public void authorize(ClientState state) throws UnauthorizedException
     {
         state.validateLogin();
     }
@@ -62,7 +58,7 @@ public class UseStatement extends ParsedStatement implements 
CQLStatement
         return new ResultMessage.SetKeyspace(keyspace);
     }
 
-    public ResultMessage executeInternal(QueryState state, QueryOptions 
options) throws InvalidRequestException
+    public ResultMessage executeLocally(QueryState state, QueryOptions 
options) throws InvalidRequestException
     {
         // In production, internal queries are exclusively on the system 
keyspace and 'use' is thus useless
         // but for some unit tests we need to set the keyspace (e.g. for tests 
with DROP INDEX)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
new file mode 100644
index 0000000..c2d0e4c
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+
+public final class AlterKeyspaceStatement extends AlterSchemaStatement
+{
+    private final KeyspaceAttributes attrs;
+
+    public AlterKeyspaceStatement(String keyspaceName, KeyspaceAttributes 
attrs)
+    {
+        super(keyspaceName);
+        this.attrs = attrs;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        attrs.validate();
+
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+            throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+
+        KeyspaceMetadata newKeyspace = 
keyspace.withSwapped(attrs.asAlteredKeyspaceParams(keyspace.params));
+
+        if (newKeyspace.params.replication.klass.equals(LocalStrategy.class))
+            throw ire("Unable to use given strategy class: LocalStrategy is 
reserved for internal use.");
+
+        newKeyspace.params.validate(keyspaceName);
+
+        return schema.withAddedOrUpdated(newKeyspace);
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.UPDATED, keyspaceName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureKeyspacePermission(keyspaceName, Permission.ALTER);
+    }
+
+    @Override
+    Set<String> clientWarnings(KeyspacesDiff diff)
+    {
+        if (diff.isEmpty())
+            return ImmutableSet.of();
+
+        KeyspaceDiff keyspaceDiff = diff.altered.get(0);
+
+        AbstractReplicationStrategy before = 
keyspaceDiff.before.createReplicationStrategy();
+        AbstractReplicationStrategy after = 
keyspaceDiff.after.createReplicationStrategy();
+
+        return before.getReplicationFactor() < after.getReplicationFactor()
+             ? ImmutableSet.of("When increasing replication factor you need to 
run a full (-full) repair to distribute the data.")
+             : ImmutableSet.of();
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_KEYSPACE, 
keyspaceName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final String keyspaceName;
+        private final KeyspaceAttributes attrs;
+
+        public Raw(String keyspaceName, KeyspaceAttributes attrs)
+        {
+            this.keyspaceName = keyspaceName;
+            this.attrs = attrs;
+        }
+
+        public AlterKeyspaceStatement prepare(ClientState state)
+        {
+            return new AlterKeyspaceStatement(keyspaceName, attrs);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
new file mode 100644
index 0000000..161c9c4
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+abstract class AlterSchemaStatement implements CQLStatement, 
SchemaTransformation
+{
+    protected final String keyspaceName; // name of the keyspace affected by 
the statement
+
+    protected AlterSchemaStatement(String keyspaceName)
+    {
+        this.keyspaceName = keyspaceName;
+    }
+
+    public final void validate(ClientState state)
+    {
+        // no-op; validation is performed while executing the statement, in 
apply()
+    }
+
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime)
+    {
+        return execute(state, false);
+    }
+
+    public ResultMessage executeLocally(QueryState state, QueryOptions options)
+    {
+        return execute(state, true);
+    }
+
+    /**
+     * TODO: document
+     */
+    abstract SchemaChange schemaChangeEvent(KeyspacesDiff diff);
+
+    /**
+     * Schema alteration may result in a new database object (keyspace, table, 
role, function) being created capable of
+     * having permissions GRANTed on it. The creator of the object (the 
primary role assigned to the AuthenticatedUser
+     * performing the operation) is automatically granted ALL applicable 
permissions on the object. This is a hook for
+     * subclasses to override in order indicate which resources to to perform 
that grant on when the statement is executed.
+     *
+     * Only called if the transformation resulted in a non-empty diff.
+     */
+    Set<IResource> createdResources(KeyspacesDiff diff)
+    {
+        return ImmutableSet.of();
+    }
+
+    /**
+     * Schema alteration might produce a client warning (e.g. a warning to run 
full repair when increading RF of a keyspace).
+     * This method should be used to generate them instead of calling warn() 
in transformation code.
+     *
+     * Only called if the transformation resulted in a non-empty diff.
+     */
+    Set<String> clientWarnings(KeyspacesDiff diff)
+    {
+        return ImmutableSet.of();
+    }
+
+    public ResultMessage execute(QueryState state, boolean locally)
+    {
+        if (SchemaConstants.isLocalSystemKeyspace(keyspaceName))
+            throw ire("System keyspace '%s' is not user-modifiable", 
keyspaceName);
+
+        KeyspaceMetadata keyspace = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
+        if (null != keyspace && keyspace.isVirtual())
+            throw ire("Virtual keyspace '%s' is not user-modifiable", 
keyspaceName);
+
+        validateKeyspaceName();
+
+        KeyspacesDiff diff = MigrationManager.announce(this, locally);
+
+        clientWarnings(diff).forEach(ClientWarn.instance::warn);
+
+        if (diff.isEmpty())
+            return new ResultMessage.Void();
+
+        /*
+         * When a schema alteration results in a new db object being created, 
we grant permissions on the new
+         * object to the user performing the request if:
+         * - the user is not anonymous
+         * - the configured IAuthorizer supports granting of permissions (not 
all do, AllowAllAuthorizer doesn't and
+         *   custom external implementations may not)
+         */
+        AuthenticatedUser user = state.getClientState().getUser();
+        if (null != user && !user.isAnonymous())
+            createdResources(diff).forEach(r -> grantPermissionsOnResource(r, 
user));
+
+        return new ResultMessage.SchemaChange(schemaChangeEvent(diff));
+    }
+
+    private void validateKeyspaceName()
+    {
+        if (!SchemaConstants.isValidName(keyspaceName))
+        {
+            throw ire("Keyspace name must not be empty, more than %d 
characters long, " +
+                      "or contain non-alphanumeric-underscore characters (got 
'%s')",
+                      SchemaConstants.NAME_LENGTH, keyspaceName);
+        }
+    }
+
+    private void grantPermissionsOnResource(IResource resource, 
AuthenticatedUser user)
+    {
+        try
+        {
+            DatabaseDescriptor.getAuthorizer()
+                              .grant(AuthenticatedUser.SYSTEM_USER,
+                                     resource.applicablePermissions(),
+                                     resource,
+                                     user.getPrimaryRole());
+        }
+        catch (UnsupportedOperationException e)
+        {
+            // not a problem - grant is an optional method on IAuthorizer
+        }
+    }
+
+    static InvalidRequestException ire(String format, Object... args)
+    {
+        return new InvalidRequestException(String.format(format, args));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
new file mode 100644
index 0000000..a081a2c
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.*;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static java.lang.String.join;
+
+import static com.google.common.collect.Iterables.isEmpty;
+import static com.google.common.collect.Iterables.transform;
+
+public abstract class AlterTableStatement extends AlterSchemaStatement
+{
+    protected final String tableName;
+
+    public AlterTableStatement(String keyspaceName, String tableName)
+    {
+        super(keyspaceName);
+        this.tableName = tableName;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+
+        TableMetadata table = null == keyspace
+                            ? null
+                            : keyspace.getTableOrViewNullable(tableName);
+
+        if (null == table)
+            throw ire("Table '%s.%s' doesn't exist", keyspaceName, tableName);
+
+        if (table.isView())
+            throw ire("Cannot use ALTER TABLE on a materialized view; use 
ALTER MATERIALIZED VIEW instead");
+
+        return schema.withAddedOrUpdated(apply(keyspace, table));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, 
tableName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureTablePermission(keyspaceName, tableName, 
Permission.ALTER);
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_TABLE, 
keyspaceName, tableName);
+    }
+
+    abstract KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata 
table);
+
+    /**
+     * ALTER TABLE <table> ALTER <column> TYPE <newtype>;
+     *
+     * No longer supported.
+     */
+    public static class AlterColumn extends AlterTableStatement
+    {
+        AlterColumn(String keyspaceName, String tableName)
+        {
+            super(keyspaceName, tableName);
+        }
+
+        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata 
table)
+        {
+            throw ire("Altering column types is no longer supported");
+        }
+    }
+
+    /**
+     * ALTER TABLE <table> ADD <column> <newtype>
+     * ALTER TABLE <table> ADD (<column> <newtype>, <column1> <newtype1>, ... 
<columnn> <newtypen>)
+     */
+    private static class AddColumns extends AlterTableStatement
+    {
+        private static class Column
+        {
+            private final ColumnMetadata.Raw name;
+            private final CQL3Type.Raw type;
+            private final boolean isStatic;
+
+            Column(ColumnMetadata.Raw name, CQL3Type.Raw type, boolean 
isStatic)
+            {
+                this.name = name;
+                this.type = type;
+                this.isStatic = isStatic;
+            }
+        }
+
+        private final Collection<Column> newColumns;
+
+        private AddColumns(String keyspaceName, String tableName, 
Collection<Column> newColumns)
+        {
+            super(keyspaceName, tableName);
+            this.newColumns = newColumns;
+        }
+
+        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata 
table)
+        {
+            TableMetadata.Builder tableBuilder = table.unbuild();
+            Views.Builder viewsBuilder = keyspace.views.unbuild();
+            newColumns.forEach(c -> addColumn(keyspace, table, c, 
tableBuilder, viewsBuilder));
+
+            return 
keyspace.withSwapped(keyspace.tables.withSwapped(tableBuilder.build()))
+                           .withSwapped(viewsBuilder.build());
+        }
+
+        private void addColumn(KeyspaceMetadata keyspace,
+                               TableMetadata table,
+                               Column column,
+                               TableMetadata.Builder tableBuilder,
+                               Views.Builder viewsBuilder)
+        {
+            ColumnIdentifier name = column.name.getIdentifier(table);
+            AbstractType<?> type = column.type.prepare(keyspaceName, 
keyspace.types).getType();
+            boolean isStatic = column.isStatic;
+
+            if (null != tableBuilder.getColumn(name))
+                throw ire("Column with name '%s' already exists", name);
+
+            if (isStatic && table.clusteringColumns().isEmpty())
+                throw ire("Static columns are only useful (and thus allowed) 
if the table has at least one clustering column");
+
+            ColumnMetadata droppedColumn = table.getDroppedColumn(name.bytes);
+            if (null != droppedColumn)
+            {
+                // After #8099, not safe to re-add columns of incompatible 
types - until *maybe* deser logic with dropped
+                // columns is pushed deeper down the line. The latter would 
still be problematic in cases of schema races.
+                if (!droppedColumn.type.isValueCompatibleWith(type))
+                {
+                    throw ire("Cannot re-add a previously dropped column '%s' 
of type %s, incompatible with previous type %s",
+                              name,
+                              type.asCQL3Type(),
+                              droppedColumn.type.asCQL3Type());
+                }
+
+                // Cannot re-add a dropped counter column. See #7831.
+                if (table.isCounter())
+                    throw ire("Cannot re-add previously dropped counter column 
%s", name);
+            }
+
+            if (isStatic)
+                tableBuilder.addStaticColumn(name, type);
+            else
+                tableBuilder.addRegularColumn(name, type);
+
+            if (!isStatic)
+            {
+                for (ViewMetadata view : keyspace.views.forTable(table.id))
+                {
+                    if (view.includeAllColumns)
+                    {
+                        ColumnMetadata viewColumn = 
ColumnMetadata.regularColumn(view.metadata, name.bytes, type);
+                        
viewsBuilder.put(viewsBuilder.get(view.name()).withAddedRegularColumn(viewColumn));
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * ALTER TABLE <table> DROP <column>
+     * ALTER TABLE <table> DROP ( <column>, <column1>, ... <columnn>)
+     */
+    // TODO: swap UDT refs with expanded tuples on drop
+    private static class DropColumns extends AlterTableStatement
+    {
+        private final Collection<ColumnMetadata.Raw> removedColumns;
+        private final long timestamp;
+
+        private DropColumns(String keyspaceName, String tableName, 
Collection<ColumnMetadata.Raw> removedColumns, long timestamp)
+        {
+            super(keyspaceName, tableName);
+            this.removedColumns = removedColumns;
+            this.timestamp = timestamp;
+        }
+
+        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata 
table)
+        {
+            TableMetadata.Builder builder = table.unbuild();
+            removedColumns.forEach(c -> dropColumn(keyspace, table, c, 
builder));
+            return 
keyspace.withSwapped(keyspace.tables.withSwapped(builder.build()));
+        }
+
+        private void dropColumn(KeyspaceMetadata keyspace, TableMetadata 
table, ColumnMetadata.Raw column, TableMetadata.Builder builder)
+        {
+            ColumnIdentifier name = column.getIdentifier(table);
+
+            ColumnMetadata currentColumn = table.getColumn(name);
+            if (null == currentColumn)
+                throw ire("Column %s was not found in table '%s'", name, 
table);
+
+            if (currentColumn.isPrimaryKeyColumn())
+                throw ire("Cannot drop PRIMARY KEY column %s", name);
+
+            /*
+             * Cannot allow dropping top-level columns of user defined types 
that aren't frozen because we cannot convert
+             * the type into an equivalent tuple: we only support frozen 
tuples currently. And as such we cannot persist
+             * the correct type in system_schema.dropped_columns.
+             */
+            if (currentColumn.type.isUDT() && currentColumn.type.isMultiCell())
+                throw ire("Cannot drop non-frozen column %s of user type %s", 
name, currentColumn.type.asCQL3Type());
+
+            // TODO: some day try and find a way to not rely on 
Keyspace/IndexManager/Index to find dependent indexes
+            Set<IndexMetadata> dependentIndexes = 
Keyspace.openAndGetStore(table).indexManager.getDependentIndexes(currentColumn);
+            if (!dependentIndexes.isEmpty())
+            {
+                throw ire("Cannot drop column %s because it has dependent 
secondary indexes (%s)",
+                          currentColumn,
+                          join(", ", transform(dependentIndexes, i -> 
i.name)));
+            }
+
+            if (!isEmpty(keyspace.views.forTable(table.id)))
+                throw ire("Cannot drop column %s on base table %s with 
materialized views", currentColumn, table.name);
+
+            builder.removeRegularOrStaticColumn(name);
+            builder.recordColumnDrop(currentColumn, timestamp);
+        }
+    }
+
+    /**
+     * ALTER TABLE <table> RENAME <column> TO <column>;
+     */
+    private static class RenameColumns extends AlterTableStatement
+    {
+        private final Map<ColumnMetadata.Raw, ColumnMetadata.Raw> 
renamedColumns;
+
+        private RenameColumns(String keyspaceName, String tableName, 
Map<ColumnMetadata.Raw, ColumnMetadata.Raw> renamedColumns)
+        {
+            super(keyspaceName, tableName);
+            this.renamedColumns = renamedColumns;
+        }
+
+        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata 
table)
+        {
+            TableMetadata.Builder tableBuilder = table.unbuild();
+            Views.Builder viewsBuilder = keyspace.views.unbuild();
+            renamedColumns.forEach((o, n) -> renameColumn(keyspace, table, o, 
n, tableBuilder, viewsBuilder));
+
+            return 
keyspace.withSwapped(keyspace.tables.withSwapped(tableBuilder.build()))
+                           .withSwapped(viewsBuilder.build());
+        }
+
+        private void renameColumn(KeyspaceMetadata keyspace,
+                                  TableMetadata table,
+                                  ColumnMetadata.Raw oldName,
+                                  ColumnMetadata.Raw newName,
+                                  TableMetadata.Builder tableBuilder,
+                                  Views.Builder viewsBuilder)
+        {
+            ColumnIdentifier oldColumnName = oldName.getIdentifier(table);
+            ColumnIdentifier newColumnName = newName.getIdentifier(table);
+
+            ColumnMetadata column = table.getColumn(oldColumnName);
+            if (null == column)
+                throw ire("Column %s was not found in table %s", 
oldColumnName, table);
+
+            if (!column.isPrimaryKeyColumn())
+                throw ire("Cannot rename non PRIMARY KEY column %s", 
oldColumnName);
+
+            if (null != table.getColumn(newColumnName))
+            {
+                throw ire("Cannot rename column %s to %s in table '%s'; 
another column with that name already exists",
+                          oldColumnName,
+                          newColumnName,
+                          table);
+            }
+
+            // TODO: some day try and find a way to not rely on 
Keyspace/IndexManager/Index to find dependent indexes
+            Set<IndexMetadata> dependentIndexes = 
Keyspace.openAndGetStore(table).indexManager.getDependentIndexes(column);
+            if (!dependentIndexes.isEmpty())
+            {
+                throw ire("Can't rename column %s because it has dependent 
secondary indexes (%s)",
+                          oldColumnName,
+                          join(", ", transform(dependentIndexes, i -> 
i.name)));
+            }
+
+            for (ViewMetadata view : keyspace.views.forTable(table.id))
+            {
+                if (view.includes(oldColumnName))
+                {
+                    ColumnIdentifier oldViewColumn = 
oldName.getIdentifier(view.metadata);
+                    ColumnIdentifier newViewColumn = 
newName.getIdentifier(view.metadata);
+
+                    
viewsBuilder.put(viewsBuilder.get(view.name()).withRenamedPrimaryKeyColumn(oldViewColumn,
 newViewColumn));
+                }
+            }
+
+            tableBuilder.renamePrimaryKeyColumn(oldColumnName, newColumnName);
+        }
+    }
+
+    /**
+     * ALTER TABLE <table> WITH <property> = <value>
+     */
+    private static class AlterOptions extends AlterTableStatement
+    {
+        private final TableAttributes attrs;
+
+        private AlterOptions(String keyspaceName, String tableName, 
TableAttributes attrs)
+        {
+            super(keyspaceName, tableName);
+            this.attrs = attrs;
+        }
+
+        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata 
table)
+        {
+            attrs.validate();
+
+            TableParams params = attrs.asAlteredTableParams(table.params);
+
+            if (table.isCounter() && params.defaultTimeToLive > 0)
+                throw ire("Cannot set default_time_to_live on a table with 
counters");
+
+            if (!isEmpty(keyspace.views.forTable(table.id)) && 
params.gcGraceSeconds == 0)
+            {
+                throw ire("Cannot alter gc_grace_seconds of the base table of 
a " +
+                          "materialized view to 0, since this value is used to 
TTL " +
+                          "undelivered updates. Setting gc_grace_seconds too 
low might " +
+                          "cause undelivered updates to expire " +
+                          "before being replayed.");
+            }
+
+            return 
keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params)));
+        }
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private enum Kind
+        {
+            ALTER_COLUMN, ADD_COLUMNS, DROP_COLUMNS, RENAME_COLUMNS, 
ALTER_OPTIONS
+        }
+
+        private final QualifiedName name;
+
+        private Kind kind;
+
+        // ADD
+        private final List<AddColumns.Column> addedColumns = new ArrayList<>();
+
+        // DROP
+        private final List<ColumnMetadata.Raw> droppedColumns = new 
ArrayList<>();
+        private long timestamp = FBUtilities.timestampMicros();
+
+        // RENAME
+        private final Map<ColumnMetadata.Raw, ColumnMetadata.Raw> 
renamedColumns = new HashMap<>();
+
+        // OPTIONS
+        public final TableAttributes attrs = new TableAttributes();
+
+        public Raw(QualifiedName name)
+        {
+            this.name = name;
+        }
+
+        public AlterTableStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+            String tableName = name.getName();
+
+            switch (kind)
+            {
+                case   ALTER_COLUMN: return new AlterColumn(keyspaceName, 
tableName);
+                case    ADD_COLUMNS: return new AddColumns(keyspaceName, 
tableName, addedColumns);
+                case   DROP_COLUMNS: return new DropColumns(keyspaceName, 
tableName, droppedColumns, timestamp);
+                case RENAME_COLUMNS: return new RenameColumns(keyspaceName, 
tableName, renamedColumns);
+                case  ALTER_OPTIONS: return new AlterOptions(keyspaceName, 
tableName, attrs);
+            }
+
+            throw new AssertionError();
+        }
+
+        public void alter(ColumnMetadata.Raw name, CQL3Type.Raw type)
+        {
+            kind = Kind.ALTER_COLUMN;
+        }
+
+        public void add(ColumnMetadata.Raw name, CQL3Type.Raw type, boolean 
isStatic)
+        {
+            kind = Kind.ADD_COLUMNS;
+            addedColumns.add(new AddColumns.Column(name, type, isStatic));
+        }
+
+        public void drop(ColumnMetadata.Raw name)
+        {
+            kind = Kind.DROP_COLUMNS;
+            droppedColumns.add(name);
+        }
+
+        public void timestamp(long timestamp)
+        {
+            this.timestamp = timestamp;
+        }
+
+        public void rename(ColumnMetadata.Raw from, ColumnMetadata.Raw to)
+        {
+            kind = Kind.RENAME_COLUMNS;
+            renamedColumns.put(from, to);
+        }
+
+        public void attrs()
+        {
+            this.kind = Kind.ALTER_OPTIONS;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
new file mode 100644
index 0000000..50f09a0
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+import static java.lang.String.join;
+import static java.util.function.Predicate.isEqual;
+import static java.util.stream.Collectors.toList;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public abstract class AlterTypeStatement extends AlterSchemaStatement
+{
+    protected final String typeName;
+
+    public AlterTypeStatement(String keyspaceName, String typeName)
+    {
+        super(keyspaceName);
+        this.typeName = typeName;
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureKeyspacePermission(keyspaceName, Permission.ALTER);
+    }
+
+    SchemaChange schemaChangeEvent(Keyspaces.KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.UPDATED, Target.TYPE, keyspaceName, 
typeName);
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+
+        UserType type = null == keyspace
+                      ? null
+                      : keyspace.types.getNullable(bytes(typeName));
+
+        if (null == type)
+            throw ire("Type %s.%s doesn't exist", keyspaceName, typeName);
+
+        return 
schema.withAddedOrUpdated(keyspace.withUpdatedUserType(apply(keyspace, type)));
+    }
+
+    abstract UserType apply(KeyspaceMetadata keyspace, UserType type);
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_TYPE, keyspaceName, 
typeName);
+    }
+
+    private static final class AddField extends AlterTypeStatement
+    {
+        private final FieldIdentifier fieldName;
+        private final CQL3Type.Raw type;
+
+        private AddField(String keyspaceName, String typeName, FieldIdentifier 
fieldName, CQL3Type.Raw type)
+        {
+            super(keyspaceName, typeName);
+            this.fieldName = fieldName;
+            this.type = type;
+        }
+
+        UserType apply(KeyspaceMetadata keyspace, UserType userType)
+        {
+            if (userType.fieldPosition(fieldName) >= 0)
+                throw ire("Cannot add field %s to type %s: a field with name 
%s already exists", fieldName, userType.toCQLString(), fieldName);
+
+            AbstractType<?> fieldType = type.prepare(keyspaceName, 
keyspace.types).getType();
+            if (fieldType.referencesUserType(userType.name))
+                throw ire("Cannot add new field %s of type %s to user type %s 
as it would create a circular reference", fieldName, type, 
userType.toCQLString());
+
+            List<FieldIdentifier> fieldNames = new 
ArrayList<>(userType.fieldNames()); fieldNames.add(fieldName);
+            List<AbstractType<?>> fieldTypes = new 
ArrayList<>(userType.fieldTypes()); fieldTypes.add(fieldType);
+
+            return new UserType(keyspaceName, userType.name, fieldNames, 
fieldTypes, true);
+        }
+    }
+
+    private static final class RenameFields extends AlterTypeStatement
+    {
+        private final Map<FieldIdentifier, FieldIdentifier> renamedFields;
+
+        private RenameFields(String keyspaceName, String typeName, 
Map<FieldIdentifier, FieldIdentifier> renamedFields)
+        {
+            super(keyspaceName, typeName);
+            this.renamedFields = renamedFields;
+        }
+
+        UserType apply(KeyspaceMetadata keyspace, UserType userType)
+        {
+            List<String> dependentAggregates =
+                keyspace.functions
+                        .udas()
+                        .filter(uda -> null != uda.initialCondition() && 
uda.stateType().referencesUserType(userType.name))
+                        .map(uda -> uda.name().toString())
+                        .collect(toList());
+
+            if (!dependentAggregates.isEmpty())
+            {
+                throw ire("Cannot alter user type %s as it is still used in 
INITCOND by aggregates %s",
+                          userType.toCQLString(),
+                          join(", ", dependentAggregates));
+            }
+
+            List<FieldIdentifier> fieldNames = new 
ArrayList<>(userType.fieldNames());
+
+            renamedFields.forEach((oldName, newName) ->
+            {
+                int idx = userType.fieldPosition(oldName);
+                if (idx < 0)
+                    throw ire("Unkown field %s in user type %s", oldName, 
keyspaceName, userType.toCQLString());
+                fieldNames.set(idx, newName);
+            });
+
+            fieldNames.forEach(name ->
+            {
+                if (fieldNames.stream().filter(isEqual(name)).count() > 1)
+                    throw ire("Duplicate field name %s in type %s", name, 
keyspaceName, userType.toCQLString());
+            });
+
+            return new UserType(keyspaceName, userType.name, fieldNames, 
userType.fieldTypes(), true);
+        }
+    }
+
+    private static final class AlterField extends AlterTypeStatement
+    {
+        private AlterField(String keyspaceName, String typeName)
+        {
+            super(keyspaceName, typeName);
+        }
+
+        UserType apply(KeyspaceMetadata keyspace, UserType userType)
+        {
+            throw ire("Alterting field types is no longer supported");
+        }
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private enum Kind
+        {
+            ADD_FIELD, RENAME_FIELDS, ALTER_FIELD
+        }
+
+        private final UTName name;
+
+        private Kind kind;
+
+        // ADD
+        private FieldIdentifier newFieldName;
+        private CQL3Type.Raw newFieldType;
+
+        // RENAME
+        private final Map<FieldIdentifier, FieldIdentifier> renamedFields = 
new HashMap<>();
+
+        public Raw(UTName name)
+        {
+            this.name = name;
+        }
+
+        public AlterTypeStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+            String typeName = name.getStringTypeName();
+
+            switch (kind)
+            {
+                case     ADD_FIELD: return new AddField(keyspaceName, 
typeName, newFieldName, newFieldType);
+                case RENAME_FIELDS: return new RenameFields(keyspaceName, 
typeName, renamedFields);
+                case   ALTER_FIELD: return new AlterField(keyspaceName, 
typeName);
+            }
+
+            throw new AssertionError();
+        }
+
+        public void add(FieldIdentifier name, CQL3Type.Raw type)
+        {
+            kind = Kind.ADD_FIELD;
+            newFieldName = name;
+            newFieldType = type;
+        }
+
+        public void rename(FieldIdentifier from, FieldIdentifier to)
+        {
+            kind = Kind.RENAME_FIELDS;
+            renamedFields.put(from, to);
+        }
+
+        public void alter(FieldIdentifier name, CQL3Type.Raw type)
+        {
+            kind = Kind.ALTER_FIELD;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java
new file mode 100644
index 0000000..2ecc095
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QualifiedName;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+public final class AlterViewStatement extends AlterSchemaStatement
+{
+    private final String viewName;
+    private final TableAttributes attrs;
+
+    public AlterViewStatement(String keyspaceName, String viewName, 
TableAttributes attrs)
+    {
+        super(keyspaceName);
+        this.viewName = viewName;
+        this.attrs = attrs;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+
+        ViewMetadata view = null == keyspace
+                          ? null
+                          : keyspace.views.getNullable(viewName);
+
+        if (null == view)
+            throw ire("Materialized view '%s.%s' doesn't exist", keyspaceName, 
viewName);
+
+        attrs.validate();
+
+        TableParams params = attrs.asAlteredTableParams(view.metadata.params);
+
+        if (params.gcGraceSeconds == 0)
+        {
+            throw ire("Cannot alter gc_grace_seconds of a materialized view to 
0, since this " +
+                      "value is used to TTL undelivered updates. Setting 
gc_grace_seconds too " +
+                      "low might cause undelivered updates to expire before 
being replayed.");
+        }
+
+        if (params.defaultTimeToLive > 0)
+        {
+            throw ire("Cannot set or alter default_time_to_live for a 
materialized view. " +
+                      "Data in a materialized view always expire at the same 
time than " +
+                      "the corresponding data in the parent table.");
+        }
+
+        ViewMetadata newView = view.copy(view.metadata.withSwapped(params));
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.views.withSwapped(newView)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, 
viewName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        ViewMetadata view = Schema.instance.getView(keyspaceName, viewName);
+        if (null != view)
+            client.ensureTablePermission(keyspaceName, view.baseTableName, 
Permission.ALTER);
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_VIEW, keyspaceName, 
viewName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName name;
+        private final TableAttributes attrs;
+
+        public Raw(QualifiedName name, TableAttributes attrs)
+        {
+            this.name = name;
+            this.attrs = attrs;
+        }
+
+        public AlterViewStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+            return new AlterViewStatement(keyspaceName, name.getName(), attrs);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
new file mode 100644
index 0000000..cd9808a
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.FunctionResource;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.Functions.FunctionsDiff;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+import static java.lang.String.format;
+import static java.lang.String.join;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.transform;
+
+public final class CreateAggregateStatement extends AlterSchemaStatement
+{
+    private final String aggregateName;
+    private final List<CQL3Type.Raw> rawArgumentTypes;
+    private final CQL3Type.Raw rawStateType;
+    private final FunctionName stateFunctionName;
+    private final FunctionName finalFunctionName;
+    private final Term.Raw rawInitialValue;
+    private final boolean orReplace;
+    private final boolean ifNotExists;
+
+    public CreateAggregateStatement(String keyspaceName,
+                                    String aggregateName,
+                                    List<CQL3Type.Raw> rawArgumentTypes,
+                                    CQL3Type.Raw rawStateType,
+                                    FunctionName stateFunctionName,
+                                    FunctionName finalFunctionName,
+                                    Term.Raw rawInitialValue,
+                                    boolean orReplace,
+                                    boolean ifNotExists)
+    {
+        super(keyspaceName);
+        this.aggregateName = aggregateName;
+        this.rawArgumentTypes = rawArgumentTypes;
+        this.rawStateType = rawStateType;
+        this.stateFunctionName = stateFunctionName;
+        this.finalFunctionName = finalFunctionName;
+        this.rawInitialValue = rawInitialValue;
+        this.orReplace = orReplace;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        if (ifNotExists && orReplace)
+            throw ire("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' 
directives");
+
+        rawArgumentTypes.stream()
+                        .filter(CQL3Type.Raw::isFrozen)
+                        .findFirst()
+                        .ifPresent(t -> { throw ire("Argument '%s' cannot be 
frozen; remove frozen<> modifier from '%s'", t, t); });
+
+        if (rawStateType.isFrozen())
+            throw ire("State type '%s' cannot be frozen; remove frozen<> 
modifier from '%s'", rawStateType, rawStateType);
+
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+        if (null == keyspace)
+            throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+
+        /*
+         * Resolve the state function
+         */
+
+        List<AbstractType<?>> argumentTypes =
+            rawArgumentTypes.stream()
+                            .map(t -> t.prepare(keyspaceName, 
keyspace.types).getType())
+                            .collect(toList());
+        AbstractType<?> stateType = rawStateType.prepare(keyspaceName, 
keyspace.types).getType();
+        List<AbstractType<?>> stateFunctionArguments = 
Lists.newArrayList(concat(singleton(stateType), argumentTypes));
+
+        Function stateFunction =
+            keyspace.functions
+                    .find(stateFunctionName, stateFunctionArguments)
+                    .orElseThrow(() -> ire("State function %s doesn't exist", 
stateFunctionString()));
+
+        if (stateFunction.isAggregate())
+            throw ire("State function %s isn't a scalar function", 
stateFunctionString());
+
+        if (!stateFunction.returnType().equals(stateType))
+        {
+            throw ire("State function %s return type must be the same as the 
first argument type - check STYPE, argument and return types",
+                      stateFunctionString());
+        }
+
+        /*
+         * Resolve the final function and return type
+         */
+
+        Function finalFunction = null;
+        AbstractType<?> returnType = stateFunction.returnType();
+
+        if (null != finalFunctionName)
+        {
+            finalFunction = keyspace.functions.find(finalFunctionName, 
singletonList(stateType)).orElse(null);
+            if (null == finalFunction)
+                throw ire("Final function %s doesn't exist", 
finalFunctionString());
+
+            if (finalFunction.isAggregate())
+                throw ire("Final function %s isn't a scalar function", 
finalFunctionString());
+
+            // override return type with that of the final function
+            returnType = finalFunction.returnType();
+        }
+
+        /*
+         * Validate initial condition
+         */
+
+        ByteBuffer initialValue = null;
+        if (null != rawInitialValue)
+        {
+            initialValue = Terms.asBytes(keyspaceName, 
rawInitialValue.toString(), stateType);
+
+            if (null != initialValue)
+            {
+                try
+                {
+                    stateType.validate(initialValue);
+                }
+                catch (MarshalException e)
+                {
+                    throw ire("Invalid value for INITCOND of type %s", 
stateType.asCQL3Type());
+                }
+            }
+
+            // Converts initcond to a CQL literal and parse it back to avoid 
another CASSANDRA-11064
+            String initialValueString = 
stateType.asCQL3Type().toCQLLiteral(initialValue, ProtocolVersion.CURRENT);
+            assert Objects.equal(initialValue, Terms.asBytes(keyspaceName, 
initialValueString, stateType));
+
+            if (Constants.NULL_LITERAL != rawInitialValue && 
UDHelper.isNullOrEmpty(stateType, initialValue))
+                throw ire("INITCOND must not be empty for all types except 
TEXT, ASCII, BLOB");
+        }
+
+        if (!((UDFunction) stateFunction).isCalledOnNullInput() && null == 
initialValue)
+        {
+            throw ire("Cannot create aggregate '%s' without INITCOND because 
state function %s does not accept 'null' arguments",
+                      aggregateName,
+                      stateFunctionName);
+        }
+
+        /*
+         * Create or replace
+         */
+
+        UDAggregate aggregate =
+            new UDAggregate(new FunctionName(keyspaceName, aggregateName),
+                            argumentTypes,
+                            returnType,
+                            (ScalarFunction) stateFunction,
+                            (ScalarFunction) finalFunction,
+                            initialValue);
+
+        Function existingAggregate = keyspace.functions.find(aggregate.name(), 
argumentTypes).orElse(null);
+        if (null != existingAggregate)
+        {
+            if (!existingAggregate.isAggregate())
+                throw ire("Aggregate '%s' cannot replace a function", 
aggregateName);
+
+            if (ifNotExists)
+                return schema;
+
+            if (!orReplace)
+                throw ire("Aggregate '%s' already exists", aggregateName);
+
+            if (!returnType.isCompatibleWith(existingAggregate.returnType()))
+            {
+                throw ire("Cannot replace aggregate '%s', the new return type 
%s isn't compatible with the return type %s of existing function",
+                          aggregateName,
+                          returnType.asCQL3Type(),
+                          existingAggregate.returnType().asCQL3Type());
+            }
+        }
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(aggregate)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        assert diff.altered.size() == 1;
+        FunctionsDiff<UDAggregate> udasDiff = diff.altered.get(0).udas;
+
+        assert udasDiff.created.size() + udasDiff.altered.size() == 1;
+        boolean created = !udasDiff.created.isEmpty();
+
+        return new SchemaChange(created ? Change.CREATED : Change.UPDATED,
+                                Target.AGGREGATE,
+                                keyspaceName,
+                                aggregateName,
+                                
rawArgumentTypes.stream().map(CQL3Type.Raw::toString).collect(toList()));
+    }
+
+    public void authorize(ClientState client)
+    {
+        FunctionName name = new FunctionName(keyspaceName, aggregateName);
+
+        if (Schema.instance.findFunction(name, 
Lists.transform(rawArgumentTypes, t -> 
t.prepare(keyspaceName).getType())).isPresent() && orReplace)
+            client.ensurePermission(Permission.ALTER, 
FunctionResource.functionFromCql(keyspaceName, aggregateName, 
rawArgumentTypes));
+        else
+            client.ensurePermission(Permission.CREATE, 
FunctionResource.keyspace(keyspaceName));
+
+        FunctionResource stateFunction =
+            FunctionResource.functionFromCql(stateFunctionName, 
Lists.newArrayList(concat(singleton(rawStateType), rawArgumentTypes)));
+        client.ensurePermission(Permission.EXECUTE, stateFunction);
+
+        if (null != finalFunctionName)
+            client.ensurePermission(Permission.EXECUTE, 
FunctionResource.functionFromCql(finalFunctionName, 
singletonList(rawStateType)));
+    }
+
+    @Override
+    Set<IResource> createdResources(KeyspacesDiff diff)
+    {
+        assert diff.altered.size() == 1;
+        FunctionsDiff<UDAggregate> udasDiff = diff.altered.get(0).udas;
+
+        assert udasDiff.created.size() + udasDiff.altered.size() == 1;
+
+        return udasDiff.created.isEmpty()
+             ? ImmutableSet.of()
+             : ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, 
aggregateName, rawArgumentTypes));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_AGGREGATE, 
keyspaceName, aggregateName);
+    }
+
+    private String stateFunctionString()
+    {
+        return format("%s(%s)", stateFunctionName, join(", ", 
transform(concat(singleton(rawStateType), rawArgumentTypes), 
Object::toString)));
+    }
+
+    private String finalFunctionString()
+    {
+        return format("%s(%s)", finalFunctionName, rawStateType);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final FunctionName aggregateName;
+        private final List<CQL3Type.Raw> rawArgumentTypes;
+        private final CQL3Type.Raw rawStateType;
+        private final String stateFunctionName;
+        private final String finalFunctionName;
+        private final Term.Raw rawInitialValue;
+        private final boolean orReplace;
+        private final boolean ifNotExists;
+
+        public Raw(FunctionName aggregateName,
+                   List<CQL3Type.Raw> rawArgumentTypes,
+                   CQL3Type.Raw rawStateType,
+                   String stateFunctionName,
+                   String finalFunctionName,
+                   Term.Raw rawInitialValue,
+                   boolean orReplace,
+                   boolean ifNotExists)
+        {
+            this.aggregateName = aggregateName;
+            this.rawArgumentTypes = rawArgumentTypes;
+            this.rawStateType = rawStateType;
+            this.stateFunctionName = stateFunctionName;
+            this.finalFunctionName = finalFunctionName;
+            this.rawInitialValue = rawInitialValue;
+            this.orReplace = orReplace;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateAggregateStatement prepare(ClientState state)
+        {
+            String keyspaceName = aggregateName.hasKeyspace() ? 
aggregateName.keyspace : state.getKeyspace();
+
+            return new CreateAggregateStatement(keyspaceName,
+                                                aggregateName.name,
+                                                rawArgumentTypes,
+                                                rawStateType,
+                                                new FunctionName(keyspaceName, 
stateFunctionName),
+                                                null != finalFunctionName ? 
new FunctionName(keyspaceName, finalFunctionName) : null,
+                                                rawInitialValue,
+                                                orReplace,
+                                                ifNotExists);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to