http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java 
b/src/java/org/apache/cassandra/db/view/View.java
index f6545b0..40f1b84 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -25,7 +25,7 @@ import javax.annotation.Nullable;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.selection.RawSelector;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -34,7 +34,6 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.ViewMetadata;
-import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,19 +55,13 @@ public class View
     public volatile List<ColumnMetadata> baseNonPKColumnsInViewPK;
     private ViewBuilder builder;
 
-    // Only the raw statement can be final, because the statement cannot 
always be prepared when the MV is initialized.
-    // For example, during startup, this view will be initialized as part of 
the Keyspace.open() work; preparing a statement
-    // also requires the keyspace to be open, so this results in 
double-initialization problems.
-    private final SelectStatement.RawStatement rawSelect;
     private SelectStatement select;
     private ReadQuery query;
 
-    public View(ViewMetadata definition,
-                ColumnFamilyStore baseCfs)
+    public View(ViewMetadata definition, ColumnFamilyStore baseCfs)
     {
         this.baseCfs = baseCfs;
-        this.name = definition.name;
-        this.rawSelect = definition.select;
+        this.name = definition.name();
 
         updateDefinition(definition);
     }
@@ -160,31 +153,52 @@ public class View
      * Returns the SelectStatement used to populate and filter this view.  
Internal users should access the select
      * statement this way to ensure it has been prepared.
      */
-    public SelectStatement getSelectStatement()
+    SelectStatement getSelectStatement()
     {
-        if (select == null)
+        if (null == select)
         {
-            ClientState state = ClientState.forInternalCalls();
-            state.setKeyspace(baseCfs.keyspace.getName());
-            rawSelect.prepareKeyspace(state);
-            ParsedStatement.Prepared prepared = rawSelect.prepare(true);
-            select = (SelectStatement) prepared.statement;
+            SelectStatement.Parameters parameters =
+                new SelectStatement.Parameters(Collections.emptyMap(),
+                                               Collections.emptyList(),
+                                               false,
+                                               true,
+                                               false);
+
+            SelectStatement.RawStatement rawSelect =
+                new SelectStatement.RawStatement(new 
QualifiedName(baseCfs.keyspace.getName(), baseCfs.name),
+                                                 parameters,
+                                                 selectClause(),
+                                                 definition.whereClause,
+                                                 null,
+                                                 null);
+
+            rawSelect.setBindVariables(Collections.emptyList());
+
+            select = rawSelect.prepare(true);
         }
 
         return select;
     }
 
+    private List<RawSelector> selectClause()
+    {
+        return definition.metadata
+                         .columns()
+                         .stream()
+                         .map(c -> c.name.toString())
+                         .map(ColumnMetadata.Raw::forQuoted)
+                         .map(c -> new RawSelector(c, null))
+                         .collect(Collectors.toList());
+    }
+
     /**
      * Returns the ReadQuery used to filter this view.  Internal users should 
access the query this way to ensure it
      * has been prepared.
      */
-    public ReadQuery getReadQuery()
+    ReadQuery getReadQuery()
     {
         if (query == null)
-        {
             query = 
getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()),
 FBUtilities.nowInSeconds());
-            logger.trace("View query: {}", rawSelect);
-        }
 
         return query;
     }
@@ -216,63 +230,13 @@ public class View
         return (view == null) ? null : 
Schema.instance.getTableMetadataRef(view.baseTableId);
     }
 
+    // TODO: REMOVE
     public static Iterable<ViewMetadata> findAll(String keyspace, String 
baseTable)
     {
         KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace);
         return Iterables.filter(ksm.views, view -> 
view.baseTableName.equals(baseTable));
     }
 
-    /**
-     * Builds the string text for a materialized view's SELECT statement.
-     */
-    public static String buildSelectStatement(String cfName, 
Collection<ColumnMetadata> includedColumns, String whereClause)
-    {
-         StringBuilder rawSelect = new StringBuilder("SELECT ");
-        if (includedColumns == null || includedColumns.isEmpty())
-            rawSelect.append("*");
-        else
-            rawSelect.append(includedColumns.stream().map(id -> 
id.name.toCQLString()).collect(Collectors.joining(", ")));
-        rawSelect.append(" FROM \"").append(cfName).append("\" WHERE ") 
.append(whereClause).append(" ALLOW FILTERING");
-        return rawSelect.toString();
-    }
-
-    public static String relationsToWhereClause(List<Relation> whereClause)
-    {
-        List<String> expressions = new ArrayList<>(whereClause.size());
-        for (Relation rel : whereClause)
-        {
-            StringBuilder sb = new StringBuilder();
-
-            if (rel.isMultiColumn())
-            {
-                sb.append(((MultiColumnRelation) rel).getEntities().stream()
-                        .map(ColumnMetadata.Raw::toString)
-                        .collect(Collectors.joining(", ", "(", ")")));
-            }
-            else
-            {
-                sb.append(((SingleColumnRelation) rel).getEntity());
-            }
-
-            sb.append(" ").append(rel.operator()).append(" ");
-
-            if (rel.isIN())
-            {
-                sb.append(rel.getInValues().stream()
-                        .map(Term.Raw::getText)
-                        .collect(Collectors.joining(", ", "(", ")")));
-            }
-            else
-            {
-                sb.append(rel.getValue().getText());
-            }
-
-            expressions.add(sb.toString());
-        }
-
-        return expressions.stream().collect(Collectors.joining(" AND "));
-    }
-
     public boolean hasSamePrimaryKeyColumnsAsBaseTable()
     {
         return baseNonPKColumnsInViewPK.isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java 
b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 8d12349..0d565ae 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -101,7 +101,7 @@ public class ViewManager
         Map<String, ViewMetadata> newViewsByName = 
Maps.newHashMapWithExpectedSize(views.size());
         for (ViewMetadata definition : views)
         {
-            newViewsByName.put(definition.name, definition);
+            newViewsByName.put(definition.name(), definition);
         }
 
         for (String viewName : viewsByName.keySet())
@@ -147,14 +147,14 @@ public class ViewManager
         if (!keyspace.hasColumnFamilyStore(definition.baseTableId))
         {
             logger.warn("Not adding view {} because the base table {} is 
unknown",
-                        definition.name,
+                        definition.name(),
                         definition.baseTableId);
             return;
         }
 
         View view = new View(definition, 
keyspace.getColumnFamilyStore(definition.baseTableId));
         forTable(view.getDefinition().baseTableId).add(view);
-        viewsByName.put(definition.name, view);
+        viewsByName.put(definition.name(), view);
     }
 
     public void removeView(String name)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 204d9ee..4bbb861 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -31,8 +31,8 @@ import com.google.common.net.HostAndPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 70aebbd..d765fac 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -50,7 +50,7 @@ import 
org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.RowFilter;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/index/TargetParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/TargetParser.java 
b/src/java/org/apache/cassandra/index/TargetParser.java
index bc679f2..9ada4c6 100644
--- a/src/java/org/apache/cassandra/index/TargetParser.java
+++ b/src/java/org/apache/cassandra/index/TargetParser.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.Pair;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java 
b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index fb0d629..76b7543 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.RowFilter;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java 
b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 76e5801..3ffcb6e 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -25,7 +25,7 @@ import com.googlecode.concurrenttrees.common.Iterables;
 
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 22296e8..0c2cf28 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -31,16 +31,16 @@ import java.util.stream.Collectors;
 
 import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.TypeCodec;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UpdateParameters;
 import org.apache.cassandra.cql3.functions.UDHelper;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.cql3.statements.CreateTypeStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
-import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DecoratedKey;
@@ -58,14 +58,12 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.schema.Views;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Utility to write SSTables.
@@ -348,8 +346,8 @@ public class CQLSSTableWriter implements Closeable
 
         protected SSTableFormat.Type formatType = null;
 
-        private CreateTableStatement.RawStatement schemaStatement;
-        private final List<CreateTypeStatement> typeStatements;
+        private CreateTableStatement.Raw schemaStatement;
+        private final List<CreateTypeStatement.Raw> typeStatements;
         private ModificationStatement.Parsed insertStatement;
         private IPartitioner partitioner;
 
@@ -398,7 +396,7 @@ public class CQLSSTableWriter implements Closeable
 
         public Builder withType(String typeDefinition) throws SyntaxException
         {
-            typeStatements.add(QueryProcessor.parseStatement(typeDefinition, 
CreateTypeStatement.class, "CREATE TYPE"));
+            typeStatements.add(QueryProcessor.parseStatement(typeDefinition, 
CreateTypeStatement.Raw.class, "CREATE TYPE"));
             return this;
         }
 
@@ -418,7 +416,7 @@ public class CQLSSTableWriter implements Closeable
          */
         public Builder forTable(String schema)
         {
-            this.schemaStatement = QueryProcessor.parseStatement(schema, 
CreateTableStatement.RawStatement.class, "CREATE TABLE");
+            this.schemaStatement = QueryProcessor.parseStatement(schema, 
CreateTableStatement.Raw.class, "CREATE TABLE");
             return this;
         }
 
@@ -531,10 +529,9 @@ public class CQLSSTableWriter implements Closeable
                                                                  
Functions.none()));
                 }
 
-
                 KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
 
-                TableMetadata tableMetadata = 
ksm.tables.getNullable(schemaStatement.columnFamily());
+                TableMetadata tableMetadata = 
ksm.tables.getNullable(schemaStatement.table());
                 if (tableMetadata == null)
                 {
                     Types types = createTypes(keyspaceName);
@@ -542,24 +539,24 @@ public class CQLSSTableWriter implements Closeable
                     
Schema.instance.load(ksm.withSwapped(ksm.tables.with(tableMetadata)).withSwapped(types));
                 }
 
-                Pair<UpdateStatement, List<ColumnSpecification>> 
preparedInsert = prepareInsert();
+                UpdateStatement preparedInsert = prepareInsert();
 
                 TableMetadataRef ref = 
TableMetadataRef.forOfflineTools(tableMetadata);
                 AbstractSSTableSimpleWriter writer = sorted
-                                                     ? new 
SSTableSimpleWriter(directory, ref, preparedInsert.left.updatedColumns())
-                                                     : new 
SSTableSimpleUnsortedWriter(directory, ref, 
preparedInsert.left.updatedColumns(), bufferSizeInMB);
+                                                   ? new 
SSTableSimpleWriter(directory, ref, preparedInsert.updatedColumns())
+                                                   : new 
SSTableSimpleUnsortedWriter(directory, ref, preparedInsert.updatedColumns(), 
bufferSizeInMB);
 
                 if (formatType != null)
                     writer.setSSTableFormatType(formatType);
 
-                return new CQLSSTableWriter(writer, preparedInsert.left, 
preparedInsert.right);
+                return new CQLSSTableWriter(writer, preparedInsert, 
preparedInsert.getBindVariables());
             }
         }
 
         private Types createTypes(String keyspace)
         {
             Types.RawBuilder builder = Types.rawBuilder(keyspace);
-            for (CreateTypeStatement st : typeStatements)
+            for (CreateTypeStatement.Raw st : typeStatements)
                 st.addToRawBuilder(builder);
             return builder.build();
         }
@@ -571,10 +568,11 @@ public class CQLSSTableWriter implements Closeable
          */
         private TableMetadata createTable(Types types)
         {
-            CreateTableStatement statement = (CreateTableStatement) 
schemaStatement.prepare(types).statement;
+            ClientState state = ClientState.forInternalCalls();
+            CreateTableStatement statement = schemaStatement.prepare(state);
             statement.validate(ClientState.forInternalCalls());
 
-            TableMetadata.Builder builder = statement.builder();
+            TableMetadata.Builder builder = statement.builder(types);
             if (partitioner != null)
                 builder.partitioner(partitioner);
 
@@ -586,20 +584,20 @@ public class CQLSSTableWriter implements Closeable
          *
          * @return prepared Insert statement and it's bound names
          */
-        private Pair<UpdateStatement, List<ColumnSpecification>> 
prepareInsert()
+        private UpdateStatement prepareInsert()
         {
-            ParsedStatement.Prepared cqlStatement = insertStatement.prepare();
-            UpdateStatement insert = (UpdateStatement) cqlStatement.statement;
-            insert.validate(ClientState.forInternalCalls());
+            ClientState state = ClientState.forInternalCalls();
+            UpdateStatement insert = (UpdateStatement) 
insertStatement.prepare(state);
+            insert.validate(state);
 
             if (insert.hasConditions())
                 throw new IllegalArgumentException("Conditional statements are 
not supported");
             if (insert.isCounter())
                 throw new IllegalArgumentException("Counter update statements 
are not supported");
-            if (cqlStatement.boundNames.isEmpty())
+            if (insert.getBindVariables().isEmpty())
                 throw new IllegalArgumentException("Provided insert statement 
has no bind variables");
 
-            return Pair.create(insert, cqlStatement.boundNames);
+            return insert;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 4097715..90c0146 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -715,7 +716,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
 
                 String format = "select event_id, source, source_port, 
activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
                 String query = String.format(format, 
SchemaConstants.TRACE_KEYSPACE_NAME, TraceKeyspace.EVENTS);
-                SelectStatement statement = (SelectStatement) 
QueryProcessor.parseStatement(query).prepare().statement;
+                SelectStatement statement = (SelectStatement) 
QueryProcessor.parseStatement(query).prepare(ClientState.forInternalCalls());
 
                 ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
                 InetAddressAndPort source = 
FBUtilities.getBroadcastAddressAndPort();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 19d83db..a85a1e5 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/ColumnMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/ColumnMetadata.java 
b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
index 0380b35..b6e743b 100644
--- a/src/java/org/apache/cassandra/schema/ColumnMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
@@ -23,7 +23,6 @@ import java.util.function.Predicate;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
 import com.google.common.collect.Collections2;
 
 import org.apache.cassandra.cql3.*;
@@ -263,12 +262,29 @@ public final class ColumnMetadata extends 
ColumnSpecification implements Selecta
 
         ColumnMetadata cd = (ColumnMetadata) o;
 
-        return Objects.equal(ksName, cd.ksName)
-            && Objects.equal(cfName, cd.cfName)
-            && Objects.equal(name, cd.name)
-            && Objects.equal(type, cd.type)
-            && Objects.equal(kind, cd.kind)
-            && Objects.equal(position, cd.position);
+        return equalsWithoutType(cd) && type.equals(cd.type);
+    }
+
+    private boolean equalsWithoutType(ColumnMetadata other)
+    {
+        return name.equals(other.name)
+            && kind == other.kind
+            && position == other.position
+            && ksName.equals(other.ksName)
+            && cfName.equals(other.cfName);
+    }
+
+    Optional<Difference> compare(ColumnMetadata other)
+    {
+        if (!equalsWithoutType(other))
+            return Optional.of(Difference.SHALLOW);
+
+        if (type.equals(other.type))
+            return Optional.empty();
+
+        return 
type.asCQL3Type().toString().equals(other.type.asCQL3Type().toString())
+             ? Optional.of(Difference.DEEP)
+             : Optional.of(Difference.SHALLOW);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/CompressionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java 
b/src/java/org/apache/cassandra/schema/CompressionParams.java
index b96334b..d644c56 100644
--- a/src/java/org/apache/cassandra/schema/CompressionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -84,6 +85,7 @@ public final class CompressionParams
     private final double minCompressRatio;  // In configuration we store min 
ratio, the input parameter.
     private final ImmutableMap<String, String> otherOptions; // Unrecognized 
options, can be used by the compressor
 
+    // TODO: deprecated, should now be carefully removed. Doesn't affect 
schema code as it isn't included in equals() and hashCode()
     private volatile double crcCheckChance = 1.0;
 
     public static CompressionParams fromMap(Map<String, String> opts)
@@ -548,20 +550,17 @@ public final class CompressionParams
     public boolean equals(Object obj)
     {
         if (obj == this)
-        {
             return true;
-        }
-        else if (obj == null || obj.getClass() != getClass())
-        {
+
+        if (!(obj instanceof CompressionParams))
             return false;
-        }
 
         CompressionParams cp = (CompressionParams) obj;
-        return new EqualsBuilder()
-            .append(sstableCompressor, cp.sstableCompressor)
-            .append(chunkLength(), cp.chunkLength())
-            .append(otherOptions, cp.otherOptions)
-            .isEquals();
+
+        return Objects.equal(sstableCompressor, cp.sstableCompressor)
+            && chunkLength == cp.chunkLength
+            && otherOptions.equals(cp.otherOptions)
+            && minCompressRatio == cp.minCompressRatio;
     }
 
     @Override
@@ -569,8 +568,9 @@ public final class CompressionParams
     {
         return new HashCodeBuilder(29, 1597)
             .append(sstableCompressor)
-            .append(chunkLength())
+            .append(chunkLength)
             .append(otherOptions)
+            .append(minCompressRatio)
             .toHashCode();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Diff.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Diff.java 
b/src/java/org/apache/cassandra/schema/Diff.java
new file mode 100644
index 0000000..36c0687
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Diff.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.Iterables;
+
+public class Diff<T extends Iterable, S>
+{
+    public final T created;
+    public final T dropped;
+    public final ImmutableCollection<Altered<S>> altered;
+
+    Diff(T created, T dropped, ImmutableCollection<Altered<S>> altered)
+    {
+        this.created = created;
+        this.dropped = dropped;
+        this.altered = altered;
+    }
+
+    boolean isEmpty()
+    {
+        return Iterables.isEmpty(created) && Iterables.isEmpty(dropped) && 
Iterables.isEmpty(altered);
+    }
+
+    Iterable<Altered<S>> altered(Difference kind)
+    {
+        return Iterables.filter(altered, a -> a.kind == kind);
+    }
+
+    public static final class Altered<T>
+    {
+        public final T before;
+        public final T after;
+        public final Difference kind;
+
+        Altered(T before, T after, Difference kind)
+        {
+            this.before = before;
+            this.after = after;
+            this.kind = kind;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Difference.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Difference.java 
b/src/java/org/apache/cassandra/schema/Difference.java
new file mode 100644
index 0000000..4f1aea9
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Difference.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+public enum Difference
+{
+    /**
+     * Two schema objects are considered to differ DEEP-ly if one or more of 
their nested schema objects differ.
+     *
+     * For example, if a table T has a column c of type U, where U is a user 
defined type, then upon altering U table
+     * T0 (before alter) will differ DEEP-ly from table T1 (after alter).
+     */
+    DEEP,
+
+    /**
+     *
+     * Two schema objects are considered to differ DEEP-ly if their direct 
structure is altered.
+     *
+     * For example, if a table T is altered to add a new column, a different 
compaction strategy, or a new description,
+     * then it will differ SHALLOW-ly from the original.
+     */
+    SHALLOW
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Functions.java 
b/src/java/org/apache/cassandra/schema/Functions.java
index 8e3a3f1..2a0111c 100644
--- a/src/java/org/apache/cassandra/schema/Functions.java
+++ b/src/java/org/apache/cassandra/schema/Functions.java
@@ -17,19 +17,20 @@
  */
 package org.apache.cassandra.schema;
 
+import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.stream.Collectors;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
 
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.db.marshal.UserType;
 
-import static com.google.common.collect.Iterables.filter;
+import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.Iterables.any;
 
 /**
  * An immutable container for a keyspace's UDAs and UDFs (and, in case of 
{@link org.apache.cassandra.db.SystemKeyspace},
@@ -37,6 +38,21 @@ import static com.google.common.collect.Iterables.filter;
  */
 public final class Functions implements Iterable<Function>
 {
+    public enum Filter implements Predicate<Function>
+    {
+        ALL, UDF, UDA;
+
+        public boolean test(Function function)
+        {
+            switch (this)
+            {
+                case UDF: return function instanceof UDFunction;
+                case UDA: return function instanceof UDAggregate;
+                default:  return true;
+            }
+        }
+    }
+
     private final ImmutableMultimap<FunctionName, Function> functions;
 
     private Functions(Builder builder)
@@ -69,12 +85,17 @@ public final class Functions implements Iterable<Function>
         return functions.values().stream();
     }
 
+    public int size()
+    {
+        return functions.size();
+    }
+
     /**
      * @return a stream of keyspace's UDFs
      */
     public Stream<UDFunction> udfs()
     {
-        return stream().filter(f -> f instanceof UDFunction).map(f -> 
(UDFunction) f);
+        return stream().filter(Filter.UDF).map(f -> (UDFunction) f);
     }
 
     /**
@@ -82,38 +103,32 @@ public final class Functions implements Iterable<Function>
      */
     public Stream<UDAggregate> udas()
     {
-        return stream().filter(f -> f instanceof UDAggregate).map(f -> 
(UDAggregate) f);
+        return stream().filter(Filter.UDA).map(f -> (UDAggregate) f);
     }
 
-    MapDifference<Pair<FunctionName, List<String>>, UDFunction> 
udfsDiff(Functions other)
+    public Iterable<Function> referencingUserType(ByteBuffer name)
     {
-        Map<Pair<FunctionName, List<String>>, UDFunction> before = new 
HashMap<>();
-        udfs().forEach(f -> before.put(Pair.create(f.name(), 
f.argumentsList()), f));
-
-        Map<Pair<FunctionName, List<String>>, UDFunction> after = new 
HashMap<>();
-        other.udfs().forEach(f -> after.put(Pair.create(f.name(), 
f.argumentsList()), f));
-
-        return Maps.difference(before, after);
+        return Iterables.filter(this, f -> f.referencesUserType(name));
     }
 
-    MapDifference<Pair<FunctionName, List<String>>, UDAggregate> 
udasDiff(Functions other)
+    public Functions withUpdatedUserType(UserType udt)
     {
-        Map<Pair<FunctionName, List<String>>, UDAggregate> before = new 
HashMap<>();
-        udas().forEach(f -> before.put(Pair.create(f.name(), 
f.argumentsList()), f));
+        if (!any(this, f -> f.referencesUserType(udt.name)))
+            return this;
 
-        Map<Pair<FunctionName, List<String>>, UDAggregate> after = new 
HashMap<>();
-        other.udas().forEach(f -> after.put(Pair.create(f.name(), 
f.argumentsList()), f));
+        Collection<UDFunction>  udfs = udfs().map(f -> 
f.withUpdatedUserType(udt)).collect(toList());
+        Collection<UDAggregate> udas = udas().map(f -> 
f.withUpdatedUserType(udfs, udt)).collect(toList());
 
-        return Maps.difference(before, after);
+        return builder().add(udfs).add(udas).build();
     }
 
     /**
-     * @return a collection of aggregates that use the provided function as 
either a state or a final function
+     * @return a stream of aggregates that use the provided function as either 
a state or a final function
      * @param function the referree function
      */
-    public Collection<UDAggregate> aggregatesUsingFunction(Function function)
+    public Stream<UDAggregate> aggregatesUsingFunction(Function function)
     {
-        return udas().filter(uda -> 
uda.hasReferenceTo(function)).collect(Collectors.toList());
+        return udas().filter(uda -> uda.hasReferenceTo(function));
     }
 
     /**
@@ -127,6 +142,11 @@ public final class Functions implements Iterable<Function>
         return functions.get(name);
     }
 
+    public Optional<Function> find(FunctionName name, List<AbstractType<?>> 
argTypes)
+    {
+        return find(name, argTypes, Filter.ALL);
+    }
+
     /**
      * Find the function with the specified name
      *
@@ -134,13 +154,18 @@ public final class Functions implements Iterable<Function>
      * @param argTypes function argument types
      * @return an empty {@link Optional} if the function name is not found; a 
non-empty optional of {@link Function} otherwise
      */
-    public Optional<Function> find(FunctionName name, List<AbstractType<?>> 
argTypes)
+    public Optional<Function> find(FunctionName name, List<AbstractType<?>> 
argTypes, Filter filter)
     {
         return get(name).stream()
-                        .filter(fun -> typesMatch(fun.argTypes(), argTypes))
+                        .filter(filter.and(fun -> typesMatch(fun.argTypes(), 
argTypes)))
                         .findAny();
     }
 
+    public boolean isEmpty()
+    {
+        return functions.isEmpty();
+    }
+
     /*
      * We need to compare the CQL3 representation of the type because comparing
      * the AbstractType will fail for example if a UDT has been changed.
@@ -154,7 +179,7 @@ public final class Functions implements Iterable<Function>
      * or
      *    ALTER TYPE foo RENAME ...
      */
-    public static boolean typesMatch(AbstractType<?> t1, AbstractType<?> t2)
+    private static boolean typesMatch(AbstractType<?> t1, AbstractType<?> t2)
     {
         return 
t1.freeze().asCQL3Type().toString().equals(t2.freeze().asCQL3Type().toString());
     }
@@ -184,6 +209,13 @@ public final class Functions implements Iterable<Function>
         return h;
     }
 
+    public Functions filter(Predicate<Function> predicate)
+    {
+        Builder builder = builder();
+        stream().filter(predicate).forEach(builder::add);
+        return builder.build();
+    }
+
     /**
      * Create a Functions instance with the provided function added
      */
@@ -203,7 +235,19 @@ public final class Functions implements Iterable<Function>
         Function fun =
             find(name, argTypes).orElseThrow(() -> new 
IllegalStateException(String.format("Function %s doesn't exists", name)));
 
-        return builder().add(filter(this, f -> f != fun)).build();
+        return without(fun);
+    }
+
+    public Functions without(Function function)
+    {
+        return builder().add(Iterables.filter(this, f -> f != 
function)).build();
+    }
+
+    public Functions withAddedOrUpdated(Function function)
+    {
+        return builder().add(Iterables.filter(this, f -> 
!(f.name().equals(function.name()) && Functions.typesMatch(f.argTypes(), 
function.argTypes()))))
+                        .add(function)
+                        .build();
     }
 
     @Override
@@ -252,10 +296,52 @@ public final class Functions implements Iterable<Function>
             return this;
         }
 
-        public  Builder add(Iterable<? extends Function> funs)
+        public Builder add(Iterable<? extends Function> funs)
         {
             funs.forEach(this::add);
             return this;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    static FunctionsDiff<UDFunction> udfsDiff(Functions before, Functions 
after)
+    {
+        return (FunctionsDiff<UDFunction>) FunctionsDiff.diff(before, after, 
Filter.UDF);
+    }
+
+    @SuppressWarnings("unchecked")
+    static FunctionsDiff<UDAggregate> udasDiff(Functions before, Functions 
after)
+    {
+        return (FunctionsDiff<UDAggregate>) FunctionsDiff.diff(before, after, 
Filter.UDA);
+    }
+
+    public static final class FunctionsDiff<T extends Function> extends 
Diff<Functions, T>
+    {
+        static final FunctionsDiff NONE = new 
FunctionsDiff<>(Functions.none(), Functions.none(), ImmutableList.of());
+
+        private FunctionsDiff(Functions created, Functions dropped, 
ImmutableCollection<Altered<T>> altered)
+        {
+            super(created, dropped, altered);
+        }
+
+        private static FunctionsDiff diff(Functions before, Functions after, 
Filter filter)
+        {
+            if (before == after)
+                return NONE;
+
+            Functions created = after.filter(filter.and(k -> 
!before.find(k.name(), k.argTypes(), filter).isPresent()));
+            Functions dropped = before.filter(filter.and(k -> 
!after.find(k.name(), k.argTypes(), filter).isPresent()));
+
+            ImmutableList.Builder<Altered<Function>> altered = 
ImmutableList.builder();
+            before.stream().filter(filter).forEach(functionBefore ->
+            {
+                after.find(functionBefore.name(), functionBefore.argTypes(), 
filter).ifPresent(functionAfter ->
+                {
+                    functionBefore.compare(functionAfter).ifPresent(kind -> 
altered.add(new Altered<>(functionBefore, functionAfter, kind)));
+                });
+            });
+
+            return new FunctionsDiff<>(created, dropped, altered.build());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java 
b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 190871a..3020793 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.UnknownIndexException;
 import org.apache.cassandra.index.Index;
@@ -98,12 +98,14 @@ public final class IndexMetadata
         return name != null && !name.isEmpty() && 
PATTERN_WORD_CHARS.matcher(name).matches();
     }
 
-    public static String getDefaultIndexName(String cfName, String root)
+    public static String generateDefaultIndexName(String table, 
ColumnIdentifier column)
     {
-        if (root == null)
-            return PATTERN_NON_WORD_CHAR.matcher(cfName + "_" + 
"idx").replaceAll("");
-        else
-            return PATTERN_NON_WORD_CHAR.matcher(cfName + "_" + root + 
"_idx").replaceAll("");
+        return PATTERN_NON_WORD_CHAR.matcher(table + "_" + column.toString() + 
"_idx").replaceAll("");
+    }
+
+    public static String generateDefaultIndexName(String table)
+    {
+        return PATTERN_NON_WORD_CHAR.matcher(table + "_" + 
"idx").replaceAll("");
     }
 
     public void validate(TableMetadata table)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java 
b/src/java/org/apache/cassandra/schema/Indexes.java
index 6122197..a83be4b 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -23,8 +23,6 @@ import java.util.stream.Stream;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
-import org.apache.cassandra.exceptions.ConfigurationException;
-
 import static java.lang.String.format;
 
 import static com.google.common.collect.Iterables.filter;
@@ -169,20 +167,6 @@ public final class Indexes implements 
Iterable<IndexMetadata>
 
     public void validate(TableMetadata table)
     {
-        /*
-         * Index name check is duplicated in Keyspaces, for the time being.
-         * The reason for this is that schema altering statements are not 
calling
-         * Keyspaces.validate() as of yet. TODO: remove this once they do (on 
CASSANDRA-9425 completion)
-         */
-        Set<String> indexNames = 
Sets.newHashSetWithExpectedSize(indexesByName.size());
-        for (IndexMetadata index : indexesByName.values())
-        {
-            if (indexNames.contains(index.name))
-                throw new ConfigurationException(format("Duplicate index name 
%s for table %s", index.name, table));
-
-            indexNames.add(index.name);
-        }
-
         indexesByName.values().forEach(i -> i.validate(table));
     }
 
@@ -198,20 +182,6 @@ public final class Indexes implements 
Iterable<IndexMetadata>
         return indexesByName.values().toString();
     }
 
-    public static String getAvailableIndexName(String ksName, String cfName, 
String indexNameRoot)
-    {
-
-        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
-        Set<String> existingNames = ksm == null ? new HashSet<>() : 
ksm.existingIndexNames(null);
-        String baseName = IndexMetadata.getDefaultIndexName(cfName, 
indexNameRoot);
-        String acceptedName = baseName;
-        int i = 0;
-        while (existingNames.contains(acceptedName))
-            acceptedName = baseName + '_' + (++i);
-
-        return acceptedName;
-    }
-
     public static final class Builder
     {
         final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new 
ImmutableMap.Builder<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java 
b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 5a72d2c..aacd962 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -27,10 +27,22 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.schema.Functions.FunctionsDiff;
+import org.apache.cassandra.schema.Tables.TablesDiff;
+import org.apache.cassandra.schema.Types.TypesDiff;
+import org.apache.cassandra.schema.Views.ViewsDiff;
+import org.apache.cassandra.service.StorageService;
 
 import static java.lang.String.format;
 
+import static com.google.common.collect.Iterables.any;
+
 /**
  * An immutable representation of keyspace metadata (name, params, tables, 
types, and functions).
  */
@@ -110,9 +122,24 @@ public final class KeyspaceMetadata
         return kind == Kind.VIRTUAL;
     }
 
+    /**
+     * Returns a new KeyspaceMetadata with all instances of old UDT replaced 
with the updated version.
+     * Replaces all instances in tables, views, types, and functions.
+     */
+    public KeyspaceMetadata withUpdatedUserType(UserType udt)
+    {
+        return new KeyspaceMetadata(name,
+                                    kind,
+                                    params,
+                                    tables.withUpdatedUserType(udt),
+                                    views.withUpdatedUserTypes(udt),
+                                    types.withUpdatedUserType(udt),
+                                    functions.withUpdatedUserType(udt));
+    }
+
     public Iterable<TableMetadata> tablesAndViews()
     {
-        return Iterables.concat(tables, views.metadatas());
+        return Iterables.concat(tables, views.allTableMetadata());
     }
 
     @Nullable
@@ -124,14 +151,34 @@ public final class KeyspaceMetadata
              : view.metadata;
     }
 
-    public Set<String> existingIndexNames(String cfToExclude)
+    public boolean hasTable(String tableName)
     {
-        Set<String> indexNames = new HashSet<>();
-        for (TableMetadata table : tables)
-            if (cfToExclude == null || !table.name.equals(cfToExclude))
-                for (IndexMetadata index : table.indexes)
-                    indexNames.add(index.name);
-        return indexNames;
+        return tables.get(tableName).isPresent();
+    }
+
+    public boolean hasView(String viewName)
+    {
+        return views.get(viewName).isPresent();
+    }
+
+    public boolean hasIndex(String indexName)
+    {
+        return any(tables, t -> t.indexes.has(indexName));
+    }
+
+    public String findAvailableIndexName(String baseName)
+    {
+        if (!hasIndex(baseName))
+            return baseName;
+
+        int i = 1;
+        do
+        {
+            String name = baseName + '_' + i++;
+            if (!hasIndex(name))
+                return name;
+        }
+        while (true);
     }
 
     public Optional<TableMetadata> findIndexedTable(String indexName)
@@ -209,4 +256,77 @@ public final class KeyspaceMetadata
             }
         }
     }
+
+    public AbstractReplicationStrategy createReplicationStrategy()
+    {
+        return AbstractReplicationStrategy.createReplicationStrategy(name,
+                                                                     
params.replication.klass,
+                                                                     
StorageService.instance.getTokenMetadata(),
+                                                                     
DatabaseDescriptor.getEndpointSnitch(),
+                                                                     
params.replication.options);
+    }
+
+    static Optional<KeyspaceDiff> diff(KeyspaceMetadata before, 
KeyspaceMetadata after)
+    {
+        return KeyspaceDiff.diff(before, after);
+    }
+
+    public static final class KeyspaceDiff
+    {
+        public final KeyspaceMetadata before;
+        public final KeyspaceMetadata after;
+
+        public final TablesDiff tables;
+        public final ViewsDiff views;
+        public final TypesDiff types;
+
+        public final FunctionsDiff<UDFunction> udfs;
+        public final FunctionsDiff<UDAggregate> udas;
+
+        private KeyspaceDiff(KeyspaceMetadata before,
+                             KeyspaceMetadata after,
+                             TablesDiff tables,
+                             ViewsDiff views,
+                             TypesDiff types,
+                             FunctionsDiff<UDFunction> udfs,
+                             FunctionsDiff<UDAggregate> udas)
+        {
+            this.before = before;
+            this.after = after;
+            this.tables = tables;
+            this.views = views;
+            this.types = types;
+            this.udfs = udfs;
+            this.udas = udas;
+        }
+
+        private static Optional<KeyspaceDiff> diff(KeyspaceMetadata before, 
KeyspaceMetadata after)
+        {
+            if (before == after)
+                return Optional.empty();
+
+            if (!before.name.equals(after.name))
+            {
+                String msg = String.format("Attempting to diff two keyspaces 
with different names ('%s' and '%s')", before.name, after.name);
+                throw new IllegalArgumentException(msg);
+            }
+
+            TablesDiff tables = Tables.diff(before.tables, after.tables);
+            ViewsDiff views = Views.diff(before.views, after.views);
+            TypesDiff types = Types.diff(before.types, after.types);
+
+            @SuppressWarnings("unchecked") FunctionsDiff<UDFunction>  udfs = 
FunctionsDiff.NONE;
+            @SuppressWarnings("unchecked") FunctionsDiff<UDAggregate> udas = 
FunctionsDiff.NONE;
+            if (before.functions != after.functions)
+            {
+                udfs = Functions.udfsDiff(before.functions, after.functions);
+                udas = Functions.udasDiff(before.functions, after.functions);
+            }
+
+            if (before.params.equals(after.params) && tables.isEmpty() && 
views.isEmpty() && types.isEmpty() && udfs.isEmpty() && udas.isEmpty())
+                return Optional.empty();
+
+            return Optional.of(new KeyspaceDiff(before, after, tables, views, 
types, udfs, udas));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/KeyspaceParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java 
b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
index 1deaa29..68ac5e4 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceParams.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
@@ -31,8 +31,8 @@ public final class KeyspaceParams
     public static final boolean DEFAULT_DURABLE_WRITES = true;
 
     /**
-     * This determines durable writes for the {@link 
org.apache.cassandra.config.SchemaConstants#SCHEMA_KEYSPACE_NAME}
-     * and {@link 
org.apache.cassandra.config.SchemaConstants#SYSTEM_KEYSPACE_NAME} keyspaces,
+     * This determines durable writes for the {@link 
org.apache.cassandra.schema.SchemaConstants#SCHEMA_KEYSPACE_NAME}
+     * and {@link 
org.apache.cassandra.schema.SchemaConstants#SYSTEM_KEYSPACE_NAME} keyspaces,
      * the only reason it is not final is for commitlog unit tests. It should 
only be changed for testing purposes.
      */
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Keyspaces.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java 
b/src/java/org/apache/cassandra/schema/Keyspaces.java
index 1692f88..1938d93 100644
--- a/src/java/org/apache/cassandra/schema/Keyspaces.java
+++ b/src/java/org/apache/cassandra/schema/Keyspaces.java
@@ -18,18 +18,21 @@
 package org.apache.cassandra.schema;
 
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
 
 public final class Keyspaces implements Iterable<KeyspaceMetadata>
 {
+    private static final Keyspaces NONE = builder().build();
+
     private final ImmutableMap<String, KeyspaceMetadata> keyspaces;
     private final ImmutableMap<TableId, TableMetadata> tables;
 
@@ -46,7 +49,7 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
 
     public static Keyspaces none()
     {
-        return builder().build();
+        return NONE;
     }
 
     public static Keyspaces of(KeyspaceMetadata... keyspaces)
@@ -69,18 +72,39 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
         return keyspaces.keySet();
     }
 
+    /**
+     * Get the keyspace with the specified name
+     *
+     * @param name a non-qualified keyspace name
+     * @return an empty {@link Optional} if the table name is not found; a 
non-empty optional of {@link KeyspaceMetadata} otherwise
+     */
+    public Optional<KeyspaceMetadata> get(String name)
+    {
+        return Optional.ofNullable(keyspaces.get(name));
+    }
+
     @Nullable
     public KeyspaceMetadata getNullable(String name)
     {
         return keyspaces.get(name);
     }
 
+    public boolean containsKeyspace(String name)
+    {
+        return keyspaces.containsKey(name);
+    }
+
     @Nullable
     public TableMetadata getTableOrViewNullable(TableId id)
     {
         return tables.get(id);
     }
 
+    public boolean isEmpty()
+    {
+        return keyspaces.isEmpty();
+    }
+
     public Keyspaces filter(Predicate<KeyspaceMetadata> predicate)
     {
         Builder builder = builder();
@@ -97,19 +121,19 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
         if (keyspace == null)
             throw new IllegalStateException(String.format("Keyspace %s doesn't 
exists", name));
 
-        return builder().add(filter(k -> k != keyspace)).build();
+        return filter(k -> k != keyspace);
     }
 
     public Keyspaces withAddedOrUpdated(KeyspaceMetadata keyspace)
     {
-        return builder().add(filter(k -> !k.name.equals(keyspace.name)))
+        return builder().add(Iterables.filter(this, k -> 
!k.name.equals(keyspace.name)))
                         .add(keyspace)
                         .build();
     }
 
-    MapDifference<String, KeyspaceMetadata> diff(Keyspaces other)
+    public void validate()
     {
-        return Maps.difference(keyspaces, other.keyspaces);
+        keyspaces.values().forEach(KeyspaceMetadata::validate);
     }
 
     @Override
@@ -167,4 +191,49 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
             return this;
         }
     }
+
+    static KeyspacesDiff diff(Keyspaces before, Keyspaces after)
+    {
+        return KeyspacesDiff.diff(before, after);
+    }
+
+    public static final class KeyspacesDiff
+    {
+        static final KeyspacesDiff NONE = new KeyspacesDiff(Keyspaces.none(), 
Keyspaces.none(), ImmutableList.of());
+
+        public final Keyspaces created;
+        public final Keyspaces dropped;
+        public final ImmutableList<KeyspaceDiff> altered;
+
+        private KeyspacesDiff(Keyspaces created, Keyspaces dropped, 
ImmutableList<KeyspaceDiff> altered)
+        {
+            this.created = created;
+            this.dropped = dropped;
+            this.altered = altered;
+        }
+
+        private static KeyspacesDiff diff(Keyspaces before, Keyspaces after)
+        {
+            if (before == after)
+                return NONE;
+
+            Keyspaces created = after.filter(k -> 
!before.containsKeyspace(k.name));
+            Keyspaces dropped = before.filter(k -> 
!after.containsKeyspace(k.name));
+
+            ImmutableList.Builder<KeyspaceDiff> altered = 
ImmutableList.builder();
+            before.forEach(keyspaceBefore ->
+            {
+                KeyspaceMetadata keyspaceAfter = 
after.getNullable(keyspaceBefore.name);
+                if (null != keyspaceAfter)
+                    KeyspaceMetadata.diff(keyspaceBefore, 
keyspaceAfter).ifPresent(altered::add);
+            });
+
+            return new KeyspacesDiff(created, dropped, altered.build());
+        }
+
+        public boolean isEmpty()
+        {
+            return created.isEmpty() && dropped.isEmpty() && altered.isEmpty();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index c8881e5..ac95054 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -23,15 +23,14 @@ import java.util.concurrent.*;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 
+import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.cql3.functions.UDAggregate;
-import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.*;
@@ -41,9 +40,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
 
 public class MigrationManager
 {
@@ -150,6 +148,14 @@ public class MigrationManager
                 && !Gossiper.instance.isGossipOnlyMember(endpoint);
     }
 
+    private static boolean shouldPushSchemaTo(InetAddressAndPort endpoint)
+    {
+        // only push schema to nodes with known and equal versions
+        return !endpoint.equals(FBUtilities.getBroadcastAddressAndPort())
+               && MessagingService.instance().knowsVersion(endpoint)
+               && MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version;
+    }
+
     public static boolean isReadyForBootstrap()
     {
         return MigrationTask.getInflightTasks().isEmpty();
@@ -194,21 +200,16 @@ public class MigrationManager
         announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), 
announceLocally);
     }
 
-    public static void announceNewTable(TableMetadata cfm) throws 
ConfigurationException
-    {
-        announceNewTable(cfm, false);
-    }
-
-    public static void announceNewTable(TableMetadata cfm, boolean 
announceLocally)
+    public static void announceNewTable(TableMetadata cfm)
     {
-        announceNewTable(cfm, announceLocally, true);
+        announceNewTable(cfm, true, FBUtilities.timestampMicros());
     }
 
     /**
      * Announces the table even if the definition is already know locally.
      * This should generally be avoided but is used internally when we want to 
force the most up to date version of
      * a system table schema (Note that we don't know if the schema we force 
_is_ the most recent version or not, we
-     * just rely on idempotency to basically ignore that announce if it's not. 
That's why we can't use announceUpdateColumnFamily,
+     * just rely on idempotency to basically ignore that announce if it's not. 
That's why we can't use announceTableUpdate
      * it would for instance delete new columns if this is not called with the 
most up-to-date version)
      *
      * Note that this is only safe for system tables where we know the id is 
fixed and will be the same whatever version
@@ -216,15 +217,10 @@ public class MigrationManager
      */
     public static void forceAnnounceNewTable(TableMetadata cfm)
     {
-        announceNewTable(cfm, false, false, 0);
-    }
-
-    private static void announceNewTable(TableMetadata cfm, boolean 
announceLocally, boolean throwOnDuplicate)
-    {
-        announceNewTable(cfm, announceLocally, throwOnDuplicate, 
FBUtilities.timestampMicros());
+        announceNewTable(cfm, false, 0);
     }
 
-    private static void announceNewTable(TableMetadata cfm, boolean 
announceLocally, boolean throwOnDuplicate, long timestamp)
+    private static void announceNewTable(TableMetadata cfm, boolean 
throwOnDuplicate, long timestamp)
     {
         cfm.validate();
 
@@ -236,49 +232,10 @@ public class MigrationManager
             throw new AlreadyExistsException(cfm.keyspace, cfm.name);
 
         logger.info("Create new table: {}", cfm);
-        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), 
announceLocally);
-    }
-
-    public static void announceNewView(ViewMetadata view, boolean 
announceLocally) throws ConfigurationException
-    {
-        view.metadata.validate();
-
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace);
-        if (ksm == null)
-            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", view.name, view.keyspace));
-        else if (ksm.getTableOrViewNullable(view.name) != null)
-            throw new AlreadyExistsException(view.keyspace, view.name);
-
-        logger.info("Create new view: {}", view);
-        announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceNewType(UserType newType, boolean 
announceLocally)
-    {
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(newType.keyspace);
-        announce(SchemaKeyspace.makeCreateTypeMutation(ksm, newType, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceNewFunction(UDFunction udf, boolean 
announceLocally)
-    {
-        logger.info("Create scalar function '{}'", udf.name());
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
-        announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceNewAggregate(UDAggregate udf, boolean 
announceLocally)
-    {
-        logger.info("Create aggregate function '{}'", udf.name());
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
-        announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
+        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), 
false);
     }
 
-    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm) throws 
ConfigurationException
-    {
-        announceKeyspaceUpdate(ksm, false);
-    }
-
-    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
+    static void announceKeyspaceUpdate(KeyspaceMetadata ksm)
     {
         ksm.validate();
 
@@ -287,20 +244,15 @@ public class MigrationManager
             throw new ConfigurationException(String.format("Cannot update non 
existing keyspace '%s'.", ksm.name));
 
         logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, 
ksm);
-        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, 
ksm.params, FBUtilities.timestampMicros()), announceLocally);
+        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, 
ksm.params, FBUtilities.timestampMicros()), false);
     }
 
-    public static void announceTableUpdate(TableMetadata tm) throws 
ConfigurationException
+    public static void announceTableUpdate(TableMetadata tm)
     {
         announceTableUpdate(tm, false);
     }
 
-    public static void announceTableUpdate(TableMetadata updated, boolean 
announceLocally) throws ConfigurationException
-    {
-        announceTableUpdate(updated, null, announceLocally);
-    }
-
-    public static void announceTableUpdate(TableMetadata updated, 
Collection<ViewMetadata> views, boolean announceLocally) throws 
ConfigurationException
+    public static void announceTableUpdate(TableMetadata updated, boolean 
announceLocally)
     {
         updated.validate();
 
@@ -309,69 +261,27 @@ public class MigrationManager
             throw new ConfigurationException(String.format("Cannot update non 
existing table '%s' in keyspace '%s'.", updated.name, updated.keyspace));
         KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(current.keyspace);
 
-        current.validateCompatibility(updated);
+        updated.validateCompatibility(current);
 
         long timestamp = FBUtilities.timestampMicros();
 
         logger.info("Update table '{}/{}' From {} To {}", current.keyspace, 
current.name, current, updated);
         Mutation.SimpleBuilder builder = 
SchemaKeyspace.makeUpdateTableMutation(ksm, current, updated, timestamp);
 
-        if (views != null)
-            views.forEach(view -> addViewUpdateToMutationBuilder(view, 
builder));
-
-        announce(builder, announceLocally);
-    }
-
-    public static void announceViewUpdate(ViewMetadata view, boolean 
announceLocally) throws ConfigurationException
-    {
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace);
-        long timestamp = FBUtilities.timestampMicros();
-        Mutation.SimpleBuilder builder = 
SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, ksm.params, timestamp);
-        addViewUpdateToMutationBuilder(view, builder);
         announce(builder, announceLocally);
     }
 
-    private static void addViewUpdateToMutationBuilder(ViewMetadata view, 
Mutation.SimpleBuilder builder)
-    {
-        view.metadata.validate();
-
-        ViewMetadata oldView = Schema.instance.getView(view.keyspace, 
view.name);
-        if (oldView == null)
-            throw new ConfigurationException(String.format("Cannot update non 
existing materialized view '%s' in keyspace '%s'.", view.name, view.keyspace));
-
-        oldView.metadata.validateCompatibility(view.metadata);
-
-        logger.info("Update view '{}/{}' From {} To {}", view.keyspace, 
view.name, oldView, view);
-        SchemaKeyspace.makeUpdateViewMutation(builder, oldView, view);
-    }
-
-    public static void announceTypeUpdate(UserType updatedType, boolean 
announceLocally)
-    {
-        logger.info("Update type '{}.{}' to {}", updatedType.keyspace, 
updatedType.getNameAsString(), updatedType);
-        announceNewType(updatedType, announceLocally);
-    }
-
-    public static void announceKeyspaceDrop(String ksName) throws 
ConfigurationException
-    {
-        announceKeyspaceDrop(ksName, false);
-    }
-
-    public static void announceKeyspaceDrop(String ksName, boolean 
announceLocally) throws ConfigurationException
+    static void announceKeyspaceDrop(String ksName)
     {
         KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName);
         if (oldKsm == null)
             throw new ConfigurationException(String.format("Cannot drop non 
existing keyspace '%s'.", ksName));
 
         logger.info("Drop Keyspace '{}'", oldKsm.name);
-        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceTableDrop(String ksName, String cfName) throws 
ConfigurationException
-    {
-        announceTableDrop(ksName, cfName, false);
+        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, 
FBUtilities.timestampMicros()), false);
     }
 
-    public static void announceTableDrop(String ksName, String cfName, boolean 
announceLocally) throws ConfigurationException
+    public static void announceTableDrop(String ksName, String cfName, boolean 
announceLocally)
     {
         TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName);
         if (tm == null)
@@ -382,37 +292,6 @@ public class MigrationManager
         announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, 
FBUtilities.timestampMicros()), announceLocally);
     }
 
-    public static void announceViewDrop(String ksName, String viewName, 
boolean announceLocally) throws ConfigurationException
-    {
-        ViewMetadata view = Schema.instance.getView(ksName, viewName);
-        if (view == null)
-            throw new ConfigurationException(String.format("Cannot drop non 
existing materialized view '%s' in keyspace '%s'.", viewName, ksName));
-        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
-
-        logger.info("Drop table '{}/{}'", view.keyspace, view.name);
-        announce(SchemaKeyspace.makeDropViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceTypeDrop(UserType droppedType, boolean 
announceLocally)
-    {
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(droppedType.keyspace);
-        announce(SchemaKeyspace.dropTypeFromSchemaMutation(ksm, droppedType, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceFunctionDrop(UDFunction udf, boolean 
announceLocally)
-    {
-        logger.info("Drop scalar function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
-        announce(SchemaKeyspace.makeDropFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceAggregateDrop(UDAggregate udf, boolean 
announceLocally)
-    {
-        logger.info("Drop aggregate function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
-        announce(SchemaKeyspace.makeDropAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
     /**
      * actively announce a new version to active hosts via rpc
      * @param schema The schema mutation to be applied
@@ -424,7 +303,7 @@ public class MigrationManager
         if (announceLocally)
             Schema.instance.merge(mutations);
         else
-            FBUtilities.waitOnFuture(announce(mutations));
+            announce(mutations);
     }
 
     private static void pushSchemaMutation(InetAddressAndPort endpoint, 
Collection<Mutation> schema)
@@ -436,38 +315,36 @@ public class MigrationManager
     }
 
     // Returns a future on the local application of the schema
-    private static Future<?> announce(final Collection<Mutation> schema)
+    private static void announce(Collection<Mutation> schema)
     {
-        Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new 
WrappedRunnable()
-        {
-            protected void runMayThrow() throws ConfigurationException
-            {
-                Schema.instance.mergeAndAnnounceVersion(schema);
-            }
-        });
+        Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(() -> 
Schema.instance.mergeAndAnnounceVersion(schema));
 
         for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
-        {
-            // only push schema to nodes with known and equal versions
-            if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) &&
-                    MessagingService.instance().knowsVersion(endpoint) &&
-                    MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version)
+            if (shouldPushSchemaTo(endpoint))
                 pushSchemaMutation(endpoint, schema);
-        }
 
-        return f;
+        FBUtilities.waitOnFuture(f);
     }
 
-    /**
-     * Announce my version passively over gossip.
-     * Used to notify nodes as they arrive in the cluster.
-     *
-     * @param version The schema version to announce
-     */
-    static void passiveAnnounce(UUID version)
+    public static KeyspacesDiff announce(SchemaTransformation transformation, 
boolean locally)
     {
-        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(version));
-        logger.debug("Gossiping my schema version {}", version);
+        long now = FBUtilities.timestampMicros();
+
+        Future<Schema.TransformationResult> future =
+            StageManager.getStage(Stage.MIGRATION).submit(() -> 
Schema.instance.transform(transformation, locally, now));
+
+        Schema.TransformationResult result = Futures.getUnchecked(future);
+        if (!result.success)
+            throw result.exception;
+
+        if (locally || result.diff.isEmpty())
+            return result.diff;
+
+        for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
+            if (shouldPushSchemaTo(endpoint))
+                pushSchemaMutation(endpoint, result.mutations);
+
+        return result.diff;
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to