http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java
new file mode 100644
index 0000000..6a6f8d9
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QualifiedName;
+import org.apache.cassandra.schema.Diff;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+public final class DropIndexStatement extends AlterSchemaStatement
+{
+    private final String indexName;
+    private final boolean ifExists;
+
+    public DropIndexStatement(String keyspaceName, String indexName, boolean 
ifExists)
+    {
+        super(keyspaceName);
+        this.indexName = indexName;
+        this.ifExists = ifExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+
+        TableMetadata table = null == keyspace
+                            ? null
+                            : 
keyspace.findIndexedTable(indexName).orElse(null);
+
+        if (null == table)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Index '%s.%s' doesn't exist'", keyspaceName, indexName);
+        }
+
+        TableMetadata newTable = 
table.withSwapped(table.indexes.without(indexName));
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.withSwapped(newTable)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        assert diff.altered.size() == 1;
+        KeyspaceDiff ksDiff = diff.altered.get(0);
+
+        assert ksDiff.tables.altered.size() == 1;
+        Diff.Altered<TableMetadata> tableDiff = 
ksDiff.tables.altered.iterator().next();
+
+        return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, 
tableDiff.after.name);
+    }
+
+    public void authorize(ClientState client)
+    {
+        KeyspaceMetadata keyspace = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
+        if (null == keyspace)
+            return;
+
+        keyspace.findIndexedTable(indexName)
+                .ifPresent(t -> client.ensureTablePermission(keyspaceName, 
t.name, Permission.ALTER));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_INDEX, keyspaceName, 
indexName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName name;
+        private final boolean ifExists;
+
+        public Raw(QualifiedName name, boolean ifExists)
+        {
+            this.name = name;
+            this.ifExists = ifExists;
+        }
+
+        public DropIndexStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+            return new DropIndexStatement(keyspaceName, name.getName(), 
ifExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java
new file mode 100644
index 0000000..ae5cf06
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+
+public final class DropKeyspaceStatement extends AlterSchemaStatement
+{
+    private final boolean ifExists;
+
+    public DropKeyspaceStatement(String keyspaceName, boolean ifExists)
+    {
+        super(keyspaceName);
+        this.ifExists = ifExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        if (schema.containsKeyspace(keyspaceName))
+            return schema.without(keyspaceName);
+
+        if (ifExists)
+            return schema;
+
+        throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.DROPPED, keyspaceName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureKeyspacePermission(keyspaceName, Permission.DROP);
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_KEYSPACE, 
keyspaceName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final String keyspaceName;
+        private final boolean ifExists;
+
+        public Raw(String keyspaceName, boolean ifExists)
+        {
+            this.keyspaceName = keyspaceName;
+            this.ifExists = ifExists;
+        }
+
+        public DropKeyspaceStatement prepare(ClientState state)
+        {
+            return new DropKeyspaceStatement(keyspaceName, ifExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java
new file mode 100644
index 0000000..9be59af
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QualifiedName;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+import static java.lang.String.join;
+
+import static com.google.common.collect.Iterables.isEmpty;
+import static com.google.common.collect.Iterables.transform;
+
+public final class DropTableStatement extends AlterSchemaStatement
+{
+    private final String tableName;
+    private final boolean ifExists;
+
+    public DropTableStatement(String keyspaceName, String tableName, boolean 
ifExists)
+    {
+        super(keyspaceName);
+        this.tableName = tableName;
+        this.ifExists = ifExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+
+        TableMetadata table = null == keyspace
+                            ? null
+                            : keyspace.getTableOrViewNullable(tableName);
+
+        if (null == table)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Table '%s.%s' doesn't exist", keyspaceName, tableName);
+        }
+
+        if (table.isView())
+            throw ire("Cannot use DROP TABLE on a materialized view. Please 
use DROP MATERIALIZED VIEW instead.");
+
+        Iterable<ViewMetadata> views = keyspace.views.forTable(table.id);
+        if (!isEmpty(views))
+        {
+            throw ire("Cannot drop a table when materialized views still 
depend on it (%s)",
+                      keyspaceName,
+                      join(", ", transform(views, ViewMetadata::name)));
+        }
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.without(table)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.DROPPED, Target.TABLE, keyspaceName, 
tableName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureTablePermission(keyspaceName, tableName, Permission.DROP);
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_TABLE, keyspaceName, 
tableName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName name;
+        private final boolean ifExists;
+
+        public Raw(QualifiedName name, boolean ifExists)
+        {
+            this.name = name;
+            this.ifExists = ifExists;
+        }
+
+        public DropTableStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+            return new DropTableStatement(keyspaceName, name.getName(), 
ifExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java
new file mode 100644
index 0000000..8de47c2
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QualifiedName;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+public final class DropTriggerStatement extends AlterSchemaStatement
+{
+    private final String tableName;
+    private final String triggerName;
+    private final boolean ifExists;
+
+    public DropTriggerStatement(String keyspaceName, String tableName, String 
triggerName, boolean ifExists)
+    {
+        super(keyspaceName);
+        this.tableName = tableName;
+        this.triggerName = triggerName;
+        this.ifExists = ifExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+
+        TableMetadata table = null == keyspace
+                            ? null
+                            : keyspace.tables.getNullable(tableName);
+
+        TriggerMetadata trigger = null == table
+                                ? null
+                                : table.triggers.get(triggerName).orElse(null);
+
+        if (null == trigger)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Trigger '%s' on '%s.%s' doesn't exist", triggerName, 
keyspaceName, tableName);
+        }
+
+        TableMetadata newTable = 
table.withSwapped(table.triggers.without(triggerName));
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.withSwapped(newTable)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, 
tableName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureIsSuperuser("Only superusers are allowed to perfrom DROP 
TRIGGER queries");
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_TRIGGER, 
keyspaceName, triggerName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName tableName;
+        private final String triggerName;
+        private final boolean ifExists;
+
+        public Raw(QualifiedName tableName, String triggerName, boolean 
ifExists)
+        {
+            this.tableName = tableName;
+            this.triggerName = triggerName;
+            this.ifExists = ifExists;
+        }
+
+        public DropTriggerStatement prepare(ClientState state)
+        {
+            String keyspaceName = tableName.hasKeyspace() ? 
tableName.getKeyspace() : state.getKeyspace();
+            return new DropTriggerStatement(keyspaceName, tableName.getName(), 
triggerName, ifExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
new file mode 100644
index 0000000..d51954c
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.UTName;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+import org.apache.cassandra.transport.Event.SchemaChange;
+
+import static java.lang.String.join;
+
+import static com.google.common.collect.Iterables.isEmpty;
+import static com.google.common.collect.Iterables.transform;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public final class DropTypeStatement extends AlterSchemaStatement
+{
+    private final String typeName;
+    private final boolean ifExists;
+
+    public DropTypeStatement(String keyspaceName, String typeName, boolean 
ifExists)
+    {
+        super(keyspaceName);
+        this.typeName = typeName;
+        this.ifExists = ifExists;
+    }
+
+    // TODO: expand types into tuples in all dropped columns of all tables
+    public Keyspaces apply(Keyspaces schema)
+    {
+        ByteBuffer name = bytes(typeName);
+
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+
+        UserType type = null == keyspace
+                      ? null
+                      : keyspace.types.getNullable(name);
+
+        if (null == type)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Type '%s.%s' doesn't exist", keyspaceName, typeName);
+        }
+
+        /*
+         * We don't want to drop a type unless it's not used anymore (mainly 
because
+         * if someone drops a type and recreates one with the same name but 
different
+         * definition with the previous name still in use, things can get 
messy).
+         * We have three places to check:
+         * 1) UDFs and UDAs using the type
+         * 2) other user type that can nest the one we drop and
+         * 3) existing tables referencing the type (maybe in a nested way).
+         */
+        Iterable<Function> functions = 
keyspace.functions.referencingUserType(name);
+        if (!isEmpty(functions))
+        {
+            throw ire("Cannot drop user type '%s.%s' as it is still used by 
functions %s",
+                      keyspaceName,
+                      typeName,
+                      join(", ", transform(functions, f -> 
f.name().toString())));
+        }
+
+        Iterable<UserType> types = keyspace.types.referencingUserType(name);
+        if (!isEmpty(types))
+        {
+            throw ire("Cannot drop user type '%s.%s' as it is still used by 
user types %s",
+                      keyspaceName,
+                      typeName,
+                      join(", ", transform(types, UserType::getNameAsString)));
+
+        }
+
+        Iterable<TableMetadata> tables = 
keyspace.tables.referencingUserType(name);
+        if (!isEmpty(tables))
+        {
+            throw ire("Cannot drop user type '%s.%s' as it is still used by 
tables %s",
+                      keyspaceName,
+                      typeName,
+                      join(", ", transform(tables, t -> t.name)));
+        }
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.types.without(type)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.DROPPED, Target.TYPE, keyspaceName, 
typeName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        client.ensureKeyspacePermission(keyspaceName, Permission.DROP);
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_TYPE, keyspaceName, 
typeName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final UTName name;
+        private final boolean ifExists;
+
+        public Raw(UTName name, boolean ifExists)
+        {
+            this.name = name;
+            this.ifExists = ifExists;
+        }
+
+        public DropTypeStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+            return new DropTypeStatement(keyspaceName, 
name.getStringTypeName(), ifExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java
new file mode 100644
index 0000000..807d03d
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QualifiedName;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
+
+public final class DropViewStatement extends AlterSchemaStatement
+{
+    private final String viewName;
+    private final boolean ifExists;
+
+    public DropViewStatement(String keyspaceName, String viewName, boolean 
ifExists)
+    {
+        super(keyspaceName);
+        this.viewName = viewName;
+        this.ifExists = ifExists;
+    }
+
+    public Keyspaces apply(Keyspaces schema)
+    {
+        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+
+        ViewMetadata view = null == keyspace
+                          ? null
+                          : keyspace.views.getNullable(viewName);
+
+        if (null == view)
+        {
+            if (ifExists)
+                return schema;
+
+            throw ire("Materialized view '%s.%s' doesn't exist", keyspaceName, 
viewName);
+        }
+
+        return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.views.without(viewName)));
+    }
+
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        return new SchemaChange(Change.DROPPED, Target.TABLE, keyspaceName, 
viewName);
+    }
+
+    public void authorize(ClientState client)
+    {
+        ViewMetadata view = Schema.instance.getView(keyspaceName, viewName);
+        if (null != view)
+            client.ensureTablePermission(keyspaceName, view.baseTableName, 
Permission.ALTER);
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_VIEW, keyspaceName, 
viewName);
+    }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final QualifiedName name;
+        private final boolean ifExists;
+
+        public Raw(QualifiedName name, boolean ifExists)
+        {
+            this.name = name;
+            this.ifExists = ifExists;
+        }
+
+        public DropViewStatement prepare(ClientState state)
+        {
+            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
+            return new DropViewStatement(keyspaceName, name.getName(), 
ifExists);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/IndexAttributes.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/IndexAttributes.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/IndexAttributes.java
new file mode 100644
index 0000000..f30c502
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/IndexAttributes.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.*;
+
+import org.apache.cassandra.cql3.statements.PropertyDefinitions;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+
+public class IndexAttributes extends PropertyDefinitions
+{
+    private static final String KW_OPTIONS = "options";
+
+    private static final Set<String> keywords = new HashSet<>();
+    private static final Set<String> obsoleteKeywords = new HashSet<>();
+
+    public boolean isCustom;
+    public String customClass;
+
+    static
+    {
+        keywords.add(KW_OPTIONS);
+    }
+
+    public void validate() throws RequestValidationException
+    {
+        validate(keywords, obsoleteKeywords);
+
+        if (isCustom && customClass == null)
+            throw new InvalidRequestException("CUSTOM index requires 
specifiying the index class");
+
+        if (!isCustom && customClass != null)
+            throw new InvalidRequestException("Cannot specify index class for 
a non-CUSTOM index");
+
+        if (!isCustom && !properties.isEmpty())
+            throw new InvalidRequestException("Cannot specify options for a 
non-CUSTOM index");
+
+        if (getRawOptions().containsKey(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
+            throw new InvalidRequestException(String.format("Cannot specify %s 
as a CUSTOM option",
+                                                            
IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+
+        if (getRawOptions().containsKey(IndexTarget.TARGET_OPTION_NAME))
+            throw new InvalidRequestException(String.format("Cannot specify %s 
as a CUSTOM option",
+                                                            
IndexTarget.TARGET_OPTION_NAME));
+
+    }
+
+    private Map<String, String> getRawOptions() throws SyntaxException
+    {
+        Map<String, String> options = getMap(KW_OPTIONS);
+        return options == null ? Collections.emptyMap() : options;
+    }
+
+    public Map<String, String> getOptions() throws SyntaxException
+    {
+        Map<String, String> options = new HashMap<>(getRawOptions());
+        options.put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, customClass);
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/IndexTarget.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/IndexTarget.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/IndexTarget.java
new file mode 100644
index 0000000..dff933d
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/IndexTarget.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+
+public class IndexTarget
+{
+    public static final String TARGET_OPTION_NAME = "target";
+    public static final String CUSTOM_INDEX_OPTION_NAME = "class_name";
+
+    public final ColumnIdentifier column;
+    public final Type type;
+
+    public IndexTarget(ColumnIdentifier column, Type type)
+    {
+        this.column = column;
+        this.type = type;
+    }
+
+    public String asCqlString()
+    {
+        return type == Type.SIMPLE
+             ? column.toCQLString()
+             : String.format("%s(%s)", type.toString(), column.toCQLString());
+    }
+
+    public static class Raw
+    {
+        private final ColumnMetadata.Raw column;
+        private final Type type;
+
+        private Raw(ColumnMetadata.Raw column, Type type)
+        {
+            this.column = column;
+            this.type = type;
+        }
+
+        public static Raw simpleIndexOn(ColumnMetadata.Raw c)
+        {
+            return new Raw(c, Type.SIMPLE);
+        }
+
+        public static Raw valuesOf(ColumnMetadata.Raw c)
+        {
+            return new Raw(c, Type.VALUES);
+        }
+
+        public static Raw keysOf(ColumnMetadata.Raw c)
+        {
+            return new Raw(c, Type.KEYS);
+        }
+
+        public static Raw keysAndValuesOf(ColumnMetadata.Raw c)
+        {
+            return new Raw(c, Type.KEYS_AND_VALUES);
+        }
+
+        public static Raw fullCollection(ColumnMetadata.Raw c)
+        {
+            return new Raw(c, Type.FULL);
+        }
+
+        public IndexTarget prepare(TableMetadata table)
+        {
+            // Until we've prepared the target column, we can't be certain 
about the target type
+            // because (for backwards compatibility) an index on a 
collection's values uses the
+            // same syntax as an index on a regular column (i.e. the 'values' 
in
+            // 'CREATE INDEX on table(values(collection));' is optional). So 
we correct the target type
+            // when the target column is a collection & the target type is 
SIMPLE.
+            ColumnMetadata columnDef = column.prepare(table);
+            Type actualType = (type == Type.SIMPLE && 
columnDef.type.isCollection()) ? Type.VALUES : type;
+            return new IndexTarget(columnDef.name, actualType);
+        }
+    }
+
+    public enum Type
+    {
+        VALUES, KEYS, KEYS_AND_VALUES, FULL, SIMPLE;
+
+        public String toString()
+        {
+            switch (this)
+            {
+                case KEYS: return "keys";
+                case KEYS_AND_VALUES: return "entries";
+                case FULL: return "full";
+                case VALUES: return "values";
+                case SIMPLE: return "";
+                default: return "";
+            }
+        }
+
+        public static Type fromString(String s)
+        {
+            if ("".equals(s))
+                return SIMPLE;
+            else if ("values".equals(s))
+                return VALUES;
+            else if ("keys".equals(s))
+                return KEYS;
+            else if ("entries".equals(s))
+                return KEYS_AND_VALUES;
+            else if ("full".equals(s))
+                return FULL;
+
+            throw new AssertionError("Unrecognized index target type " + s);
+        }
+    }
+    
+    @Override
+    public String toString()
+    {
+        return asCqlString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/KeyspaceAttributes.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/KeyspaceAttributes.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/KeyspaceAttributes.java
new file mode 100644
index 0000000..06c16a9
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/KeyspaceAttributes.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.*;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.cql3.statements.PropertyDefinitions;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.KeyspaceParams.Option;
+import org.apache.cassandra.schema.ReplicationParams;
+
+public final class KeyspaceAttributes extends PropertyDefinitions
+{
+    private static final Set<String> validKeywords;
+    private static final Set<String> obsoleteKeywords;
+
+    static
+    {
+        ImmutableSet.Builder<String> validBuilder = ImmutableSet.builder();
+        for (Option option : Option.values())
+            validBuilder.add(option.toString());
+        validKeywords = validBuilder.build();
+        obsoleteKeywords = ImmutableSet.of();
+    }
+
+    public void validate()
+    {
+        validate(validKeywords, obsoleteKeywords);
+
+        Map<String, String> replicationOptions = getAllReplicationOptions();
+        if (!replicationOptions.isEmpty() && 
!replicationOptions.containsKey(ReplicationParams.CLASS))
+            throw new ConfigurationException("Missing replication strategy 
class");
+    }
+
+    private String getReplicationStrategyClass()
+    {
+        return getAllReplicationOptions().get(ReplicationParams.CLASS);
+    }
+
+    private Map<String, String> getAllReplicationOptions()
+    {
+        Map<String, String> replication = 
getMap(Option.REPLICATION.toString());
+        return replication == null
+             ? Collections.emptyMap()
+             : replication;
+    }
+
+    KeyspaceParams asNewKeyspaceParams()
+    {
+        boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), 
KeyspaceParams.DEFAULT_DURABLE_WRITES);
+        return KeyspaceParams.create(durableWrites, 
getAllReplicationOptions());
+    }
+
+    KeyspaceParams asAlteredKeyspaceParams(KeyspaceParams previous)
+    {
+        boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), 
previous.durableWrites);
+        ReplicationParams replication = getReplicationStrategyClass() == null
+                                      ? previous.replication
+                                      : 
ReplicationParams.fromMap(getAllReplicationOptions());
+        return new KeyspaceParams(durableWrites, replication);
+    }
+
+    public boolean hasOption(Option option)
+    {
+        return hasProperty(option.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
new file mode 100644
index 0000000..8cc2685
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.cql3.statements.PropertyDefinitions;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.TableParams.Option;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+
+import static java.lang.String.format;
+
+public final class TableAttributes extends PropertyDefinitions
+{
+    public static final String ID = "id";
+    private static final Set<String> validKeywords;
+    private static final Set<String> obsoleteKeywords;
+
+    static
+    {
+        ImmutableSet.Builder<String> validBuilder = ImmutableSet.builder();
+        for (Option option : Option.values())
+            validBuilder.add(option.toString());
+        validBuilder.add(ID);
+        validKeywords = validBuilder.build();
+        obsoleteKeywords = ImmutableSet.of();
+    }
+
+    public void validate()
+    {
+        validate(validKeywords, obsoleteKeywords);
+        build(TableParams.builder()).validate();
+    }
+
+    TableParams asNewTableParams()
+    {
+        return build(TableParams.builder());
+    }
+
+    TableParams asAlteredTableParams(TableParams previous)
+    {
+        if (getId() != null)
+            throw new ConfigurationException("Cannot alter table id.");
+        return build(previous.unbuild());
+    }
+
+    public TableId getId() throws ConfigurationException
+    {
+        String id = getSimple(ID);
+        try
+        {
+            return id != null ? TableId.fromString(id) : null;
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException("Invalid table id", e);
+        }
+    }
+
+    private TableParams build(TableParams.Builder builder)
+    {
+        if (hasOption(Option.BLOOM_FILTER_FP_CHANCE))
+            
builder.bloomFilterFpChance(getDouble(Option.BLOOM_FILTER_FP_CHANCE));
+
+        if (hasOption(Option.CACHING))
+            builder.caching(CachingParams.fromMap(getMap(Option.CACHING)));
+
+        if (hasOption(Option.COMMENT))
+            builder.comment(getString(Option.COMMENT));
+
+        if (hasOption(Option.COMPACTION))
+            
builder.compaction(CompactionParams.fromMap(getMap(Option.COMPACTION)));
+
+        if (hasOption(Option.COMPRESSION))
+        {
+            //crc_check_chance was "promoted" from a compression property to a 
top-level-property after #9839
+            //so we temporarily accept it to be defined as a compression 
option, to maintain backwards compatibility
+            Map<String, String> compressionOpts = getMap(Option.COMPRESSION);
+            if 
(compressionOpts.containsKey(Option.CRC_CHECK_CHANCE.toString().toLowerCase()))
+            {
+                Double crcCheckChance = 
getDeprecatedCrcCheckChance(compressionOpts);
+                builder.crcCheckChance(crcCheckChance);
+            }
+            
builder.compression(CompressionParams.fromMap(getMap(Option.COMPRESSION)));
+        }
+
+        if (hasOption(Option.DEFAULT_TIME_TO_LIVE))
+            builder.defaultTimeToLive(getInt(Option.DEFAULT_TIME_TO_LIVE));
+
+        if (hasOption(Option.GC_GRACE_SECONDS))
+            builder.gcGraceSeconds(getInt(Option.GC_GRACE_SECONDS));
+
+        if (hasOption(Option.MAX_INDEX_INTERVAL))
+            builder.maxIndexInterval(getInt(Option.MAX_INDEX_INTERVAL));
+
+        if (hasOption(Option.MEMTABLE_FLUSH_PERIOD_IN_MS))
+            
builder.memtableFlushPeriodInMs(getInt(Option.MEMTABLE_FLUSH_PERIOD_IN_MS));
+
+        if (hasOption(Option.MIN_INDEX_INTERVAL))
+            builder.minIndexInterval(getInt(Option.MIN_INDEX_INTERVAL));
+
+        if (hasOption(Option.SPECULATIVE_RETRY))
+            
builder.speculativeRetry(SpeculativeRetryPolicy.fromString(getString(Option.SPECULATIVE_RETRY)));
+
+        if (hasOption(Option.CRC_CHECK_CHANCE))
+            builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE));
+
+        if (hasOption(Option.CDC))
+            builder.cdc(getBoolean(Option.CDC.toString(), false));
+
+        return builder.build();
+    }
+
+    private Double getDeprecatedCrcCheckChance(Map<String, String> 
compressionOpts)
+    {
+        String value = 
compressionOpts.get(Option.CRC_CHECK_CHANCE.toString().toLowerCase());
+        try
+        {
+            return Double.valueOf(value);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new SyntaxException(String.format("Invalid double value %s 
for crc_check_chance.'", value));
+        }
+    }
+
+    private double getDouble(Option option)
+    {
+        String value = getString(option);
+
+        try
+        {
+            return Double.parseDouble(value);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new SyntaxException(format("Invalid double value %s for 
'%s'", value, option));
+        }
+    }
+
+    private int getInt(Option option)
+    {
+        String value = getString(option);
+
+        try
+        {
+            return Integer.parseInt(value);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new SyntaxException(String.format("Invalid integer value %s 
for '%s'", value, option));
+        }
+    }
+
+    private String getString(Option option)
+    {
+        String value = getSimple(option.toString());
+        if (value == null)
+            throw new IllegalStateException(format("Option '%s' is absent", 
option));
+        return value;
+    }
+
+    private Map<String, String> getMap(Option option)
+    {
+        Map<String, String> value = getMap(option.toString());
+        if (value == null)
+            throw new IllegalStateException(format("Option '%s' is absent", 
option));
+        return value;
+    }
+
+    public boolean hasOption(Option option)
+    {
+        return hasProperty(option.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index cb62c14..33c0f32 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -52,7 +52,6 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -366,12 +365,8 @@ public class Keyspace
 
     private void createReplicationStrategy(KeyspaceMetadata ksm)
     {
-        logger.info("Creating replication strategy " + ksm .name + " params " 
+ ksm.params);
-        replicationStrategy = 
AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
-                                                                               
     ksm.params.replication.klass,
-                                                                               
     StorageService.instance.getTokenMetadata(),
-                                                                               
     DatabaseDescriptor.getEndpointSnitch(),
-                                                                               
     ksm.params.replication.options);
+        logger.info("Creating replication strategy " + ksm.name + " params " + 
ksm.params);
+        replicationStrategy = ksm.createReplicationStrategy();
         if (!ksm.params.replication.equals(replicationParams))
         {
             logger.debug("New replication settings for keyspace {} - 
invalidating disk boundary caches", ksm.name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index b4de801..fb9e889 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.marshal.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/TableCQLHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TableCQLHelper.java 
b/src/java/org/apache/cassandra/db/TableCQLHelper.java
index 0e9977d..550a6d6 100644
--- a/src/java/org/apache/cassandra/db/TableCQLHelper.java
+++ b/src/java/org/apache/cassandra/db/TableCQLHelper.java
@@ -26,7 +26,7 @@ import java.util.function.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.cql3.statements.*;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.utils.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java 
b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index 9eb5d82..0248629 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -297,12 +297,6 @@ public abstract class AbstractCompositeType extends 
AbstractType<ByteBuffer>
         return BytesSerializer.instance;
     }
 
-    @Override
-    public boolean referencesUserType(String name)
-    {
-        return getComponents().stream().anyMatch(f -> 
f.referencesUserType(name));
-    }
-
     /**
      * @return the comparator for the given component. static CompositeType 
will consult
      * @param i DynamicCompositeType will read the type information from 
@param bb

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java 
b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 7a5cc13..f305313 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -456,11 +456,31 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>, Assignm
             ByteBufferUtil.skipWithVIntLength(in);
     }
 
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
     {
         return false;
     }
 
+    /**
+     * Returns an instance of this type with all references to the provided 
user type recursively replaced with its new
+     * definition.
+     */
+    public AbstractType<?> withUpdatedUserType(UserType udt)
+    {
+        return this;
+    }
+
+    /**
+     * Replace any instances of UserType with equivalent TupleType-s.
+     *
+     * We need it for dropped_columns, to allow safely dropping unused user 
types later without retaining any references
+     * to them in system_schema.dropped_columns.
+     */
+    public AbstractType<?> expandUserTypes()
+    {
+        return this;
+    }
+
     public boolean referencesDuration()
     {
         return false;
@@ -468,9 +488,6 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>, Assignm
 
     /**
      * Tests whether a CQL value having this type can be assigned to the 
provided receiver.
-     *
-     * @param keyspace the keyspace from which the receiver is.
-     * @param receiver the receiver for which we want to test type 
compatibility with.
      */
     public AssignmentTestable.TestResult testAssignment(AbstractType<?> 
receiverType)
     {
@@ -505,17 +522,6 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>, Assignm
         return getClass().getName();
     }
 
-    /**
-     * Checks to see if two types are equal when ignoring or not ignoring 
differences in being frozen, depending on
-     * the value of the ignoreFreezing parameter.
-     * @param other type to compare
-     * @param ignoreFreezing if true, differences in the types being frozen 
will be ignored
-     */
-    public boolean equals(Object other, boolean ignoreFreezing)
-    {
-        return this.equals(other);
-    }
-
     public void checkComparable()
     {
         switch (comparisonType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java 
b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index e83a041..b198e0c 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -211,7 +211,7 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
     }
 
     @Override
-    public boolean equals(Object o, boolean ignoreFreezing)
+    public boolean equals(Object o)
     {
         if (this == o)
             return true;
@@ -224,11 +224,10 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         if (kind != other.kind)
             return false;
 
-        if (!ignoreFreezing && isMultiCell() != other.isMultiCell())
+        if (isMultiCell() != other.isMultiCell())
             return false;
 
-        return nameComparator().equals(other.nameComparator(), ignoreFreezing) 
&&
-               valueComparator().equals(other.valueComparator(), 
ignoreFreezing);
+        return nameComparator().equals(other.nameComparator()) && 
valueComparator().equals(other.valueComparator());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java 
b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 12e7fc3..ac4c69f 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -32,6 +32,9 @@ import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
+
 /*
  * The encoding of a CompositeType column name should be:
  *   <component><component><component> ...
@@ -102,18 +105,10 @@ public class CompositeType extends AbstractCompositeType
     public static CompositeType getInstance(List<AbstractType<?>> types)
     {
         assert types != null && !types.isEmpty();
-
-        CompositeType ct = instances.get(types);
-        if (ct == null)
-        {
-            ct = new CompositeType(types);
-            CompositeType previous = instances.putIfAbsent(types, ct);
-            if (previous != null)
-            {
-                ct = previous;
-            }
-        }
-        return ct;
+        CompositeType t = instances.get(types);
+        return null == t
+             ? instances.computeIfAbsent(types, CompositeType::new)
+             : t;
     }
 
     protected CompositeType(List<AbstractType<?>> types)
@@ -287,6 +282,29 @@ public class CompositeType extends AbstractCompositeType
         return true;
     }
 
+    @Override
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return any(types, t -> t.referencesUserType(name));
+    }
+
+    @Override
+    public CompositeType withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        instances.remove(types);
+
+        return getInstance(transform(types, t -> t.withUpdatedUserType(udt)));
+    }
+
+    @Override
+    public AbstractType<?> expandUserTypes()
+    {
+        return getInstance(transform(types, AbstractType::expandUserTypes));
+    }
+
     private static class StaticParsedComparator implements ParsedComparator
     {
         final AbstractType<?> type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java 
b/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
index 6fa7e87..0458dc8 100644
--- a/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
@@ -17,24 +17,25 @@
  */
 package org.apache.cassandra.db.marshal;
 
-import java.nio.charset.CharacterCodingException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.nio.charset.CharacterCodingException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-import org.apache.cassandra.cql3.Term;
+import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.collect.Iterables.any;
+
 /*
  * The encoding of a DynamicCompositeType column name should be:
  *   <component><component><component> ...
@@ -61,9 +62,9 @@ public class DynamicCompositeType extends 
AbstractCompositeType
     private final Map<Byte, AbstractType<?>> aliases;
 
     // interning instances
-    private static final ConcurrentMap<Map<Byte, AbstractType<?>>, 
DynamicCompositeType> instances = new ConcurrentHashMap<Map<Byte, 
AbstractType<?>>, DynamicCompositeType>();
+    private static final ConcurrentHashMap<Map<Byte, AbstractType<?>>, 
DynamicCompositeType> instances = new ConcurrentHashMap<>();
 
-    public static synchronized DynamicCompositeType getInstance(TypeParser 
parser) throws ConfigurationException, SyntaxException
+    public static DynamicCompositeType getInstance(TypeParser parser)
     {
         return getInstance(parser.getAliasParameters());
     }
@@ -71,9 +72,9 @@ public class DynamicCompositeType extends 
AbstractCompositeType
     public static DynamicCompositeType getInstance(Map<Byte, AbstractType<?>> 
aliases)
     {
         DynamicCompositeType dct = instances.get(aliases);
-        if (dct == null)
-            dct = instances.computeIfAbsent(aliases, k ->  new 
DynamicCompositeType(k));
-        return dct;
+        return null == dct
+             ? instances.computeIfAbsent(aliases, DynamicCompositeType::new)
+             : dct;
     }
 
     private DynamicCompositeType(Map<Byte, AbstractType<?>> aliases)
@@ -255,6 +256,29 @@ public class DynamicCompositeType extends 
AbstractCompositeType
         return true;
     }
 
+    @Override
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return any(aliases.values(), t -> t.referencesUserType(name));
+    }
+
+    @Override
+    public DynamicCompositeType withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        instances.remove(aliases);
+
+        return getInstance(Maps.transformValues(aliases, v -> 
v.withUpdatedUserType(udt)));
+    }
+
+    @Override
+    public AbstractType<?> expandUserTypes()
+    {
+        return getInstance(Maps.transformValues(aliases, v -> 
v.expandUserTypes()));
+    }
+
     private class DynamicParsedComparator implements ParsedComparator
     {
         final AbstractType<?> type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java 
b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 31b5a11..3dbf4a3 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Lists;
@@ -29,20 +28,15 @@ import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.ListSerializer;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class ListType<T> extends CollectionType<List<T>>
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(ListType.class);
-
     // interning instances
-    private static final ConcurrentMap<AbstractType<?>, ListType> instances = 
new ConcurrentHashMap<>();
-    private static final ConcurrentMap<AbstractType<?>, ListType> 
frozenInstances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<AbstractType<?>, ListType> 
instances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<AbstractType<?>, ListType> 
frozenInstances = new ConcurrentHashMap<>();
 
     private final AbstractType<T> elements;
     public final ListSerializer<T> serializer;
@@ -57,13 +51,13 @@ public class ListType<T> extends CollectionType<List<T>>
         return getInstance(l.get(0), true);
     }
 
-    public static <T> ListType<T> getInstance(AbstractType<T> elements, final 
boolean isMultiCell)
+    public static <T> ListType<T> getInstance(AbstractType<T> elements, 
boolean isMultiCell)
     {
-        ConcurrentMap<AbstractType<?>, ListType> internMap = isMultiCell ? 
instances : frozenInstances;
+        ConcurrentHashMap<AbstractType<?>, ListType> internMap = isMultiCell ? 
instances : frozenInstances;
         ListType<T> t = internMap.get(elements);
-        if (t == null)
-            t = internMap.computeIfAbsent(elements, k -> new ListType<>(k, 
isMultiCell) );
-        return t;
+        return null == t
+             ? internMap.computeIfAbsent(elements, k -> new ListType<>(k, 
isMultiCell))
+             : t;
     }
 
     private ListType(AbstractType<T> elements, boolean isMultiCell)
@@ -75,9 +69,26 @@ public class ListType<T> extends CollectionType<List<T>>
     }
 
     @Override
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return elements.referencesUserType(name);
+    }
+
+    @Override
+    public ListType<?> withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        (isMultiCell ? instances : frozenInstances).remove(elements);
+
+        return getInstance(elements.withUpdatedUserType(udt), isMultiCell);
+    }
+
+    @Override
+    public AbstractType<?> expandUserTypes()
     {
-        return getElementsType().referencesUserType(userTypeName);
+        return getInstance(elements.expandUserTypes(), isMultiCell);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java 
b/src/java/org/apache/cassandra/db/marshal/MapType.java
index e333493..bab25d3 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Maps;
@@ -37,8 +36,8 @@ import org.apache.cassandra.utils.Pair;
 public class MapType<K, V> extends CollectionType<Map<K, V>>
 {
     // interning instances
-    private static final ConcurrentMap<Pair<AbstractType<?>, AbstractType<?>>, 
MapType> instances = new ConcurrentHashMap<>();
-    private static final ConcurrentMap<Pair<AbstractType<?>, AbstractType<?>>, 
MapType> frozenInstances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<Pair<AbstractType<?>, 
AbstractType<?>>, MapType> instances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<Pair<AbstractType<?>, 
AbstractType<?>>, MapType> frozenInstances = new ConcurrentHashMap<>();
 
     private final AbstractType<K> keys;
     private final AbstractType<V> values;
@@ -56,12 +55,12 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
 
     public static <K, V> MapType<K, V> getInstance(AbstractType<K> keys, 
AbstractType<V> values, boolean isMultiCell)
     {
-        ConcurrentMap<Pair<AbstractType<?>, AbstractType<?>>, MapType> 
internMap = isMultiCell ? instances : frozenInstances;
-        Pair<AbstractType<?>, AbstractType<?>> p = Pair.<AbstractType<?>, 
AbstractType<?>>create(keys, values);
+        ConcurrentHashMap<Pair<AbstractType<?>, AbstractType<?>>, MapType> 
internMap = isMultiCell ? instances : frozenInstances;
+        Pair<AbstractType<?>, AbstractType<?>> p = Pair.create(keys, values);
         MapType<K, V> t = internMap.get(p);
-        if (t == null)
-            t = internMap.computeIfAbsent(p, k -> new MapType<>(k.left, 
k.right, isMultiCell) );
-        return t;
+        return null == t
+             ? internMap.computeIfAbsent(p, k -> new MapType<>(k.left, 
k.right, isMultiCell))
+             : t;
     }
 
     private MapType(AbstractType<K> keys, AbstractType<V> values, boolean 
isMultiCell)
@@ -74,10 +73,26 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
     {
-        return getKeysType().referencesUserType(userTypeName) ||
-               getValuesType().referencesUserType(userTypeName);
+        return keys.referencesUserType(name) || 
values.referencesUserType(name);
+    }
+
+    @Override
+    public MapType<?,?> withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        (isMultiCell ? instances : frozenInstances).remove(Pair.create(keys, 
values));
+
+        return getInstance(keys.withUpdatedUserType(udt), 
values.withUpdatedUserType(udt), isMultiCell);
+    }
+
+    @Override
+    public AbstractType<?> expandUserTypes()
+    {
+        return getInstance(keys.expandUserTypes(), values.expandUserTypes(), 
isMultiCell);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/ReversedType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ReversedType.java 
b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
index 250dfdc..63a900a 100644
--- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
@@ -18,14 +18,13 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -33,11 +32,11 @@ import org.apache.cassandra.transport.ProtocolVersion;
 public class ReversedType<T> extends AbstractType<T>
 {
     // interning instances
-    private static final Map<AbstractType<?>, ReversedType> instances = new 
HashMap<AbstractType<?>, ReversedType>();
+    private static final Map<AbstractType<?>, ReversedType> instances = new 
ConcurrentHashMap<>();
 
     public final AbstractType<T> baseType;
 
-    public static <T> ReversedType<T> getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
+    public static <T> ReversedType<T> getInstance(TypeParser parser)
     {
         List<AbstractType<?>> types = parser.getTypeParameters();
         if (types.size() != 1)
@@ -45,15 +44,12 @@ public class ReversedType<T> extends AbstractType<T>
         return getInstance((AbstractType<T>) types.get(0));
     }
 
-    public static synchronized <T> ReversedType<T> getInstance(AbstractType<T> 
baseType)
+    public static <T> ReversedType<T> getInstance(AbstractType<T> baseType)
     {
-        ReversedType<T> type = instances.get(baseType);
-        if (type == null)
-        {
-            type = new ReversedType<T>(baseType);
-            instances.put(baseType, type);
-        }
-        return type;
+        ReversedType<T> t = instances.get(baseType);
+        return null == t
+             ? instances.computeIfAbsent(baseType, ReversedType::new)
+             : t;
     }
 
     private ReversedType(AbstractType<T> baseType)
@@ -126,9 +122,27 @@ public class ReversedType<T> extends AbstractType<T>
         return baseType.getSerializer();
     }
 
-    public boolean referencesUserType(String userTypeName)
+    @Override
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return baseType.referencesUserType(name);
+    }
+
+    @Override
+    public AbstractType<?> expandUserTypes()
     {
-        return baseType.referencesUserType(userTypeName);
+        return getInstance(baseType.expandUserTypes());
+    }
+
+    @Override
+    public ReversedType<?> withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        instances.remove(baseType);
+
+        return getInstance(baseType.withUpdatedUserType(udt));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java 
b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 4374612..ae9e0c0 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Sets;
@@ -35,8 +34,8 @@ import org.apache.cassandra.transport.ProtocolVersion;
 public class SetType<T> extends CollectionType<Set<T>>
 {
     // interning instances
-    private static final ConcurrentMap<AbstractType<?>, SetType> instances = 
new ConcurrentHashMap<>();
-    private static final ConcurrentMap<AbstractType<?>, SetType> 
frozenInstances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<AbstractType<?>, SetType> instances 
= new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<AbstractType<?>, SetType> 
frozenInstances = new ConcurrentHashMap<>();
 
     private final AbstractType<T> elements;
     private final SetSerializer<T> serializer;
@@ -53,11 +52,11 @@ public class SetType<T> extends CollectionType<Set<T>>
 
     public static <T> SetType<T> getInstance(AbstractType<T> elements, boolean 
isMultiCell)
     {
-        ConcurrentMap<AbstractType<?>, SetType> internMap = isMultiCell ? 
instances : frozenInstances;
+        ConcurrentHashMap<AbstractType<?>, SetType> internMap = isMultiCell ? 
instances : frozenInstances;
         SetType<T> t = internMap.get(elements);
-        if (t == null)
-            t = internMap.computeIfAbsent(elements, k -> new SetType<>(k, 
isMultiCell) );
-        return t;
+        return null == t
+             ? internMap.computeIfAbsent(elements, k -> new SetType<>(k, 
isMultiCell))
+             : t;
     }
 
     public SetType(AbstractType<T> elements, boolean isMultiCell)
@@ -69,9 +68,26 @@ public class SetType<T> extends CollectionType<Set<T>>
     }
 
     @Override
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
     {
-        return getElementsType().referencesUserType(userTypeName);
+        return elements.referencesUserType(name);
+    }
+
+    @Override
+    public SetType<?> withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        (isMultiCell ? instances : frozenInstances).remove(elements);
+
+        return getInstance(elements.withUpdatedUserType(udt), isMultiCell);
+    }
+
+    @Override
+    public AbstractType<?> expandUserTypes()
+    {
+        return getInstance(elements.expandUserTypes(), isMultiCell);
     }
 
     public AbstractType<T> getElementsType()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/TupleType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TupleType.java 
b/src/java/org/apache/cassandra/db/marshal/TupleType.java
index be9cc93..00f4d24 100644
--- a/src/java/org/apache/cassandra/db/marshal/TupleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TupleType.java
@@ -23,9 +23,9 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -35,6 +35,9 @@ import org.apache.cassandra.serializers.*;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
+
 /**
  * This is essentially like a CompositeType, but it's not primarily meant for 
comparison, just
  * to pack multiple values together so has a more friendly encoding.
@@ -62,8 +65,9 @@ public class TupleType extends AbstractType<ByteBuffer>
     protected TupleType(List<AbstractType<?>> types, boolean freezeInner)
     {
         super(ComparisonType.CUSTOM);
+
         if (freezeInner)
-            this.types = 
types.stream().map(AbstractType::freeze).collect(Collectors.toList());
+            this.types = Lists.newArrayList(transform(types, 
AbstractType::freeze));
         else
             this.types = types;
         this.serializer = new TupleSerializer(fieldSerializers(types));
@@ -87,9 +91,23 @@ public class TupleType extends AbstractType<ByteBuffer>
     }
 
     @Override
-    public boolean referencesUserType(String name)
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return any(types, t -> t.referencesUserType(name));
+    }
+
+    @Override
+    public TupleType withUpdatedUserType(UserType udt)
+    {
+        return referencesUserType(udt.name)
+             ? new TupleType(Lists.newArrayList(transform(types, t -> 
t.withUpdatedUserType(udt))))
+             : this;
+    }
+
+    @Override
+    public AbstractType<?> expandUserTypes()
     {
-        return allTypes().stream().anyMatch(f -> f.referencesUserType(name));
+        return new TupleType(Lists.newArrayList(transform(types, 
AbstractType::expandUserTypes)));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java 
b/src/java/org/apache/cassandra/db/marshal/UserType.java
index 6149e94..01e6a3f 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -22,20 +22,21 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.schema.Difference;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.UserTypeSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
 
 /**
  * A user defined type.
@@ -44,8 +45,6 @@ import org.slf4j.LoggerFactory;
  */
 public class UserType extends TupleType
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(UserType.class);
-
     public final String keyspace;
     public final ByteBuffer name;
     private final List<FieldIdentifier> fieldNames;
@@ -73,7 +72,7 @@ public class UserType extends TupleType
         this.serializer = new UserTypeSerializer(fieldSerializers);
     }
 
-    public static UserType getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
+    public static UserType getInstance(TypeParser parser)
     {
         Pair<Pair<String, ByteBuffer>, List<Pair<ByteBuffer, AbstractType>>> 
params = parser.getUserTypeParameters();
         String keyspace = params.left.left;
@@ -333,34 +332,44 @@ public class UserType extends TupleType
     @Override
     public boolean equals(Object o)
     {
-        return o instanceof UserType && equals(o, false);
-    }
-
-    @Override
-    public boolean equals(Object o, boolean ignoreFreezing)
-    {
         if(!(o instanceof UserType))
             return false;
 
         UserType that = (UserType)o;
 
-        if (!keyspace.equals(that.keyspace) || !name.equals(that.name) || 
!fieldNames.equals(that.fieldNames))
-            return false;
+        return equalsWithoutTypes(that) && types.equals(that.types);
+    }
 
-        if (!ignoreFreezing && isMultiCell != that.isMultiCell)
-            return false;
+    private boolean equalsWithoutTypes(UserType other)
+    {
+        return name.equals(other.name)
+            && fieldNames.equals(other.fieldNames)
+            && keyspace.equals(other.keyspace)
+            && isMultiCell == other.isMultiCell;
+    }
 
-        if (this.types.size() != that.types.size())
-            return false;
+    public Optional<Difference> compare(UserType other)
+    {
+        if (!equalsWithoutTypes(other))
+            return Optional.of(Difference.SHALLOW);
+
+        boolean differsDeeply = false;
 
-        Iterator<AbstractType<?>> otherTypeIter = that.types.iterator();
-        for (AbstractType<?> type : types)
+        for (int i = 0; i < fieldTypes().size(); i++)
         {
-            if (!type.equals(otherTypeIter.next(), ignoreFreezing))
-                return false;
+            AbstractType<?> thisType = fieldType(i);
+            AbstractType<?> thatType = other.fieldType(i);
+
+            if (!thisType.equals(thatType))
+            {
+                if 
(thisType.asCQL3Type().toString().equals(thatType.asCQL3Type().toString()))
+                    differsDeeply = true;
+                else
+                    return Optional.of(Difference.SHALLOW);
+            }
         }
 
-        return true;
+        return differsDeeply ? Optional.of(Difference.DEEP) : Optional.empty();
     }
 
     @Override
@@ -370,10 +379,30 @@ public class UserType extends TupleType
     }
 
     @Override
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return this.name.equals(name) || any(fieldTypes(), t -> 
t.referencesUserType(name));
+    }
+
+    @Override
+    public UserType withUpdatedUserType(UserType udt)
     {
-        return getNameAsString().equals(userTypeName) ||
-               fieldTypes().stream().anyMatch(f -> 
f.referencesUserType(userTypeName));
+        if (!referencesUserType(udt.name))
+            return this;
+
+        // preserve frozen/non-frozen status of the updated UDT
+        if (name.equals(udt.name))
+        {
+            return isMultiCell == udt.isMultiCell
+                 ? udt
+                 : new UserType(keyspace, name, udt.fieldNames(), 
udt.fieldTypes(), isMultiCell);
+        }
+
+        return new UserType(keyspace,
+                            name,
+                            fieldNames,
+                            Lists.newArrayList(transform(fieldTypes(), t -> 
t.withUpdatedUserType(udt))),
+                            isMultiCell());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java 
b/src/java/org/apache/cassandra/db/view/TableViews.java
index 298fcfd..eedcfbd 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -85,7 +85,7 @@ public class TableViews extends AbstractCollection<View>
     public Iterable<ColumnFamilyStore> allViewsCfs()
     {
         Keyspace keyspace = Keyspace.open(baseTableMetadata.keyspace);
-        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().name));
+        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().name()));
     }
 
     public void forceBlockingFlush()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to