http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java 
b/src/java/org/apache/cassandra/schema/Indexes.java
index eb49d39..81d400e 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -15,14 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.schema;
 
 import java.util.*;
+import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+import static java.lang.String.format;
 
 import static com.google.common.collect.Iterables.filter;
 
@@ -35,7 +37,7 @@ import static com.google.common.collect.Iterables.filter;
  * support is added for multiple target columns per-index and for indexes with
  * TargetType.ROW
  */
-public class Indexes implements Iterable<IndexMetadata>
+public final class Indexes implements Iterable<IndexMetadata>
 {
     private final ImmutableMap<String, IndexMetadata> indexesByName;
     private final ImmutableMap<UUID, IndexMetadata> indexesById;
@@ -56,11 +58,26 @@ public class Indexes implements Iterable<IndexMetadata>
         return builder().build();
     }
 
+    public static Indexes of(IndexMetadata... indexes)
+    {
+        return builder().add(indexes).build();
+    }
+
+    public static Indexes of(Iterable<IndexMetadata> indexes)
+    {
+        return builder().add(indexes).build();
+    }
+
     public Iterator<IndexMetadata> iterator()
     {
         return indexesByName.values().iterator();
     }
 
+    public Stream<IndexMetadata> stream()
+    {
+        return indexesById.values().stream();
+    }
+
     public int size()
     {
         return indexesByName.size();
@@ -121,7 +138,7 @@ public class Indexes implements Iterable<IndexMetadata>
     public Indexes with(IndexMetadata index)
     {
         if (get(index.name).isPresent())
-            throw new IllegalStateException(String.format("Index %s already 
exists", index.name));
+            throw new IllegalStateException(format("Index %s already exists", 
index.name));
 
         return builder().add(this).add(index).build();
     }
@@ -131,7 +148,7 @@ public class Indexes implements Iterable<IndexMetadata>
      */
     public Indexes without(String name)
     {
-        IndexMetadata index = get(name).orElseThrow(() -> new 
IllegalStateException(String.format("Index %s doesn't exist", name)));
+        IndexMetadata index = get(name).orElseThrow(() -> new 
IllegalStateException(format("Index %s doesn't exist", name)));
         return builder().add(filter(this, v -> v != index)).build();
     }
 
@@ -149,6 +166,25 @@ public class Indexes implements Iterable<IndexMetadata>
         return this == o || (o instanceof Indexes && 
indexesByName.equals(((Indexes) o).indexesByName));
     }
 
+    public void validate(TableMetadata table)
+    {
+        /*
+         * Index name check is duplicated in Keyspaces, for the time being.
+         * The reason for this is that schema altering statements are not 
calling
+         * Keyspaces.validate() as of yet. TODO: remove this once they do (on 
CASSANDRA-9425 completion)
+         */
+        Set<String> indexNames = new HashSet<>();
+        for (IndexMetadata index : indexesByName.values())
+        {
+            if (indexNames.contains(index.name))
+                throw new ConfigurationException(format("Duplicate index name 
%s for table %s", index.name, table));
+
+            indexNames.add(index.name);
+        }
+
+        indexesByName.values().forEach(i -> i.validate(table));
+    }
+
     @Override
     public int hashCode()
     {
@@ -164,7 +200,7 @@ public class Indexes implements Iterable<IndexMetadata>
     public static String getAvailableIndexName(String ksName, String cfName, 
String indexNameRoot)
     {
 
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
         Set<String> existingNames = ksm == null ? new HashSet<>() : 
ksm.existingIndexNames(null);
         String baseName = IndexMetadata.getDefaultIndexName(cfName, 
indexNameRoot);
         String acceptedName = baseName;
@@ -196,6 +232,13 @@ public class Indexes implements Iterable<IndexMetadata>
             return this;
         }
 
+        public Builder add(IndexMetadata... indexes)
+        {
+            for (IndexMetadata index : indexes)
+                add(index);
+            return this;
+        }
+
         public Builder add(Iterable<IndexMetadata> indexes)
         {
             indexes.forEach(this::add);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java 
b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 4fefd44..80a3869 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -27,11 +27,10 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.SchemaConstants;
-import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
+import static java.lang.String.format;
+
 /**
  * An immutable representation of keyspace metadata (name, params, tables, 
types, and functions).
  */
@@ -94,15 +93,15 @@ public final class KeyspaceMetadata
         return new KeyspaceMetadata(name, params, tables, views, types, 
functions);
     }
 
-    public Iterable<CFMetaData> tablesAndViews()
+    public Iterable<TableMetadata> tablesAndViews()
     {
         return Iterables.concat(tables, views.metadatas());
     }
 
     @Nullable
-    public CFMetaData getTableOrViewNullable(String tableOrViewName)
+    public TableMetadata getTableOrViewNullable(String tableOrViewName)
     {
-        ViewDefinition view = views.getNullable(tableOrViewName);
+        ViewMetadata view = views.getNullable(tableOrViewName);
         return view == null
              ? tables.getNullable(tableOrViewName)
              : view.metadata;
@@ -111,18 +110,18 @@ public final class KeyspaceMetadata
     public Set<String> existingIndexNames(String cfToExclude)
     {
         Set<String> indexNames = new HashSet<>();
-        for (CFMetaData table : tables)
-            if (cfToExclude == null || !table.cfName.equals(cfToExclude))
-                for (IndexMetadata index : table.getIndexes())
+        for (TableMetadata table : tables)
+            if (cfToExclude == null || !table.name.equals(cfToExclude))
+                for (IndexMetadata index : table.indexes)
                     indexNames.add(index.name);
         return indexNames;
     }
 
-    public Optional<CFMetaData> findIndexedTable(String indexName)
+    public Optional<TableMetadata> findIndexedTable(String indexName)
     {
-        for (CFMetaData cfm : tablesAndViews())
-            if (cfm.getIndexes().has(indexName))
-                return Optional.of(cfm);
+        for (TableMetadata table : tablesAndViews())
+            if (table.indexes.has(indexName))
+                return Optional.of(table);
 
         return Optional.empty();
     }
@@ -167,12 +166,28 @@ public final class KeyspaceMetadata
 
     public void validate()
     {
-        if (!CFMetaData.isNameValid(name))
-            throw new ConfigurationException(String.format("Keyspace name must 
not be empty, more than %s characters long, "
-                                                           + "or contain 
non-alphanumeric-underscore characters (got \"%s\")",
-                                                           
SchemaConstants.NAME_LENGTH,
-                                                           name));
+        if (!SchemaConstants.isValidName(name))
+        {
+            throw new ConfigurationException(format("Keyspace name must not be 
empty, more than %s characters long, "
+                                                    + "or contain 
non-alphanumeric-underscore characters (got \"%s\")",
+                                                    
SchemaConstants.NAME_LENGTH,
+                                                    name));
+        }
+
         params.validate(name);
-        tablesAndViews().forEach(CFMetaData::validate);
+
+        tablesAndViews().forEach(TableMetadata::validate);
+
+        Set<String> indexNames = new HashSet<>();
+        for (TableMetadata table : tables)
+        {
+            for (IndexMetadata index : table.indexes)
+            {
+                if (indexNames.contains(index.name))
+                    throw new ConfigurationException(format("Duplicate index 
name %s in keyspace %s", index.name, name));
+
+                indexNames.add(index.name);
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Keyspaces.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java 
b/src/java/org/apache/cassandra/schema/Keyspaces.java
index 8c0a63e..1692f88 100644
--- a/src/java/org/apache/cassandra/schema/Keyspaces.java
+++ b/src/java/org/apache/cassandra/schema/Keyspaces.java
@@ -18,9 +18,12 @@
 package org.apache.cassandra.schema;
 
 import java.util.Iterator;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
+import javax.annotation.Nullable;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
@@ -28,10 +31,12 @@ import com.google.common.collect.Maps;
 public final class Keyspaces implements Iterable<KeyspaceMetadata>
 {
     private final ImmutableMap<String, KeyspaceMetadata> keyspaces;
+    private final ImmutableMap<TableId, TableMetadata> tables;
 
     private Keyspaces(Builder builder)
     {
         keyspaces = builder.keyspaces.build();
+        tables = builder.tables.build();
     }
 
     public static Builder builder()
@@ -59,6 +64,23 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
         return keyspaces.values().stream();
     }
 
+    public Set<String> names()
+    {
+        return keyspaces.keySet();
+    }
+
+    @Nullable
+    public KeyspaceMetadata getNullable(String name)
+    {
+        return keyspaces.get(name);
+    }
+
+    @Nullable
+    public TableMetadata getTableOrViewNullable(TableId id)
+    {
+        return tables.get(id);
+    }
+
     public Keyspaces filter(Predicate<KeyspaceMetadata> predicate)
     {
         Builder builder = builder();
@@ -66,6 +88,25 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
         return builder.build();
     }
 
+    /**
+     * Creates a Keyspaces instance with the keyspace with the provided name 
removed
+     */
+    public Keyspaces without(String name)
+    {
+        KeyspaceMetadata keyspace = getNullable(name);
+        if (keyspace == null)
+            throw new IllegalStateException(String.format("Keyspace %s doesn't 
exists", name));
+
+        return builder().add(filter(k -> k != keyspace)).build();
+    }
+
+    public Keyspaces withAddedOrUpdated(KeyspaceMetadata keyspace)
+    {
+        return builder().add(filter(k -> !k.name.equals(keyspace.name)))
+                        .add(keyspace)
+                        .build();
+    }
+
     MapDifference<String, KeyspaceMetadata> diff(Keyspaces other)
     {
         return Maps.difference(keyspaces, other.keyspaces);
@@ -92,6 +133,7 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
     public static final class Builder
     {
         private final ImmutableMap.Builder<String, KeyspaceMetadata> keyspaces 
= new ImmutableMap.Builder<>();
+        private final ImmutableMap.Builder<TableId, TableMetadata> tables = 
new ImmutableMap.Builder<>();
 
         private Builder()
         {
@@ -105,6 +147,10 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
         public Builder add(KeyspaceMetadata keyspace)
         {
             keyspaces.put(keyspace.name, keyspace);
+
+            keyspace.tables.forEach(t -> tables.put(t.id, t));
+            keyspace.views.forEach(v -> tables.put(v.metadata.id, v.metadata));
+
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
new file mode 100644
index 0000000..d34a096
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class MigrationManager
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MigrationManager.class);
+
+    public static final MigrationManager instance = new MigrationManager();
+
+    private static final RuntimeMXBean runtimeMXBean = 
ManagementFactory.getRuntimeMXBean();
+
+    private static final int MIGRATION_DELAY_IN_MS = 60000;
+
+    private static final int MIGRATION_TASK_WAIT_IN_SECONDS = 
Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", 
"1"));
+
+    private MigrationManager() {}
+
+    public static void scheduleSchemaPull(InetAddress endpoint, EndpointState 
state)
+    {
+        VersionedValue value = 
state.getApplicationState(ApplicationState.SCHEMA);
+
+        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != 
null)
+            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+    }
+
+    /**
+     * If versions differ this node sends request with local migration list to 
the endpoint
+     * and expecting to receive a list of migrations to apply locally.
+     */
+    private static void maybeScheduleSchemaPull(final UUID theirVersion, final 
InetAddress endpoint)
+    {
+        if ((Schema.instance.getVersion() != null && 
Schema.instance.getVersion().equals(theirVersion)) || 
!shouldPullSchemaFrom(endpoint))
+        {
+            logger.debug("Not pulling schema because versions match or 
shouldPullSchemaFrom returned false");
+            return;
+        }
+
+        if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) 
|| runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+        {
+            // If we think we may be bootstrapping or have recently started, 
submit MigrationTask immediately
+            logger.debug("Submitting migration task for {}", endpoint);
+            submitMigrationTask(endpoint);
+        }
+        else
+        {
+            // Include a delay to make sure we have a chance to apply any 
changes being
+            // pushed out simultaneously. See CASSANDRA-5025
+            Runnable runnable = () ->
+            {
+                // grab the latest version of the schema since it may have 
changed again since the initial scheduling
+                EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+                if (epState == null)
+                {
+                    logger.debug("epState vanished for {}, not submitting 
migration task", endpoint);
+                    return;
+                }
+                VersionedValue value = 
epState.getApplicationState(ApplicationState.SCHEMA);
+                UUID currentVersion = UUID.fromString(value.value);
+                if (Schema.instance.getVersion().equals(currentVersion))
+                {
+                    logger.debug("not submitting migration task for {} because 
our versions match", endpoint);
+                    return;
+                }
+                logger.debug("submitting migration task for {}", endpoint);
+                submitMigrationTask(endpoint);
+            };
+            ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 
MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private static Future<?> submitMigrationTask(InetAddress endpoint)
+    {
+        /*
+         * Do not de-ref the future because that causes distributed deadlock 
(CASSANDRA-3832) because we are
+         * running in the gossip stage.
+         */
+        return StageManager.getStage(Stage.MIGRATION).submit(new 
MigrationTask(endpoint));
+    }
+
+    static boolean shouldPullSchemaFrom(InetAddress endpoint)
+    {
+        /*
+         * Don't request schema from nodes with a differnt or unknonw major 
version (may have incompatible schema)
+         * Don't request schema from fat clients
+         */
+        return MessagingService.instance().knowsVersion(endpoint)
+                && MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version
+                && !Gossiper.instance.isGossipOnlyMember(endpoint);
+    }
+
+    public static boolean isReadyForBootstrap()
+    {
+        return MigrationTask.getInflightTasks().isEmpty();
+    }
+
+    public static void waitUntilReadyForBootstrap()
+    {
+        CountDownLatch completionLatch;
+        while ((completionLatch = MigrationTask.getInflightTasks().poll()) != 
null)
+        {
+            try
+            {
+                if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, 
TimeUnit.SECONDS))
+                    logger.error("Migration task failed to complete");
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                logger.error("Migration task was interrupted");
+            }
+        }
+    }
+
+    public static void announceNewKeyspace(KeyspaceMetadata ksm) throws 
ConfigurationException
+    {
+        announceNewKeyspace(ksm, false);
+    }
+
+    public static void announceNewKeyspace(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
+    {
+        announceNewKeyspace(ksm, FBUtilities.timestampMicros(), 
announceLocally);
+    }
+
+    public static void announceNewKeyspace(KeyspaceMetadata ksm, long 
timestamp, boolean announceLocally) throws ConfigurationException
+    {
+        ksm.validate();
+
+        if (Schema.instance.getKeyspaceMetadata(ksm.name) != null)
+            throw new AlreadyExistsException(ksm.name);
+
+        logger.info("Create new Keyspace: {}", ksm);
+        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), 
announceLocally);
+    }
+
+    public static void announceNewTable(TableMetadata cfm) throws 
ConfigurationException
+    {
+        announceNewTable(cfm, false);
+    }
+
+    public static void announceNewTable(TableMetadata cfm, boolean 
announceLocally)
+    {
+        announceNewTable(cfm, announceLocally, true);
+    }
+
+    /**
+     * Announces the table even if the definition is already know locally.
+     * This should generally be avoided but is used internally when we want to 
force the most up to date version of
+     * a system table schema (Note that we don't know if the schema we force 
_is_ the most recent version or not, we
+     * just rely on idempotency to basically ignore that announce if it's not. 
That's why we can't use announceUpdateColumnFamily,
+     * it would for instance delete new columns if this is not called with the 
most up-to-date version)
+     *
+     * Note that this is only safe for system tables where we know the id is 
fixed and will be the same whatever version
+     * of the definition is used.
+     */
+    public static void forceAnnounceNewTable(TableMetadata cfm)
+    {
+        announceNewTable(cfm, false, false);
+    }
+
+    private static void announceNewTable(TableMetadata cfm, boolean 
announceLocally, boolean throwOnDuplicate)
+    {
+        cfm.validate();
+
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(cfm.keyspace);
+        if (ksm == null)
+            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", cfm.name, cfm.keyspace));
+        // If we have a table or a view which has the same name, we can't add 
a new one
+        else if (throwOnDuplicate && ksm.getTableOrViewNullable(cfm.name) != 
null)
+            throw new AlreadyExistsException(cfm.keyspace, cfm.name);
+
+        logger.info("Create new table: {}", cfm);
+        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceNewView(ViewMetadata view, boolean 
announceLocally) throws ConfigurationException
+    {
+        view.metadata.validate();
+
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace);
+        if (ksm == null)
+            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", view.name, view.keyspace));
+        else if (ksm.getTableOrViewNullable(view.name) != null)
+            throw new AlreadyExistsException(view.keyspace, view.name);
+
+        logger.info("Create new view: {}", view);
+        announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceNewType(UserType newType, boolean 
announceLocally)
+    {
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(newType.keyspace);
+        announce(SchemaKeyspace.makeCreateTypeMutation(ksm, newType, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceNewFunction(UDFunction udf, boolean 
announceLocally)
+    {
+        logger.info("Create scalar function '{}'", udf.name());
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
+        announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceNewAggregate(UDAggregate udf, boolean 
announceLocally)
+    {
+        logger.info("Create aggregate function '{}'", udf.name());
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
+        announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm) throws 
ConfigurationException
+    {
+        announceKeyspaceUpdate(ksm, false);
+    }
+
+    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
+    {
+        ksm.validate();
+
+        KeyspaceMetadata oldKsm = 
Schema.instance.getKeyspaceMetadata(ksm.name);
+        if (oldKsm == null)
+            throw new ConfigurationException(String.format("Cannot update non 
existing keyspace '%s'.", ksm.name));
+
+        logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, 
ksm);
+        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, 
ksm.params, FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceTableUpdate(TableMetadata tm) throws 
ConfigurationException
+    {
+        announceTableUpdate(tm, false);
+    }
+
+    public static void announceTableUpdate(TableMetadata updated, boolean 
announceLocally) throws ConfigurationException
+    {
+        updated.validate();
+
+        TableMetadata current = 
Schema.instance.getTableMetadata(updated.keyspace, updated.name);
+        if (current == null)
+            throw new ConfigurationException(String.format("Cannot update non 
existing table '%s' in keyspace '%s'.", updated.name, updated.keyspace));
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(current.keyspace);
+
+        current.validateCompatibility(updated);
+
+        logger.info("Update table '{}/{}' From {} To {}", current.keyspace, 
current.name, current, updated);
+        announce(SchemaKeyspace.makeUpdateTableMutation(ksm, current, updated, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceViewUpdate(ViewMetadata view, boolean 
announceLocally) throws ConfigurationException
+    {
+        view.metadata.validate();
+
+        ViewMetadata oldView = Schema.instance.getView(view.keyspace, 
view.name);
+        if (oldView == null)
+            throw new ConfigurationException(String.format("Cannot update non 
existing materialized view '%s' in keyspace '%s'.", view.name, view.keyspace));
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace);
+
+        oldView.metadata.validateCompatibility(view.metadata);
+
+        logger.info("Update view '{}/{}' From {} To {}", view.keyspace, 
view.name, oldView, view);
+        announce(SchemaKeyspace.makeUpdateViewMutation(ksm, oldView, view, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceTypeUpdate(UserType updatedType, boolean 
announceLocally)
+    {
+        logger.info("Update type '{}.{}' to {}", updatedType.keyspace, 
updatedType.getNameAsString(), updatedType);
+        announceNewType(updatedType, announceLocally);
+    }
+
+    public static void announceKeyspaceDrop(String ksName) throws 
ConfigurationException
+    {
+        announceKeyspaceDrop(ksName, false);
+    }
+
+    public static void announceKeyspaceDrop(String ksName, boolean 
announceLocally) throws ConfigurationException
+    {
+        KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName);
+        if (oldKsm == null)
+            throw new ConfigurationException(String.format("Cannot drop non 
existing keyspace '%s'.", ksName));
+
+        logger.info("Drop Keyspace '{}'", oldKsm.name);
+        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceTableDrop(String ksName, String cfName) throws 
ConfigurationException
+    {
+        announceTableDrop(ksName, cfName, false);
+    }
+
+    public static void announceTableDrop(String ksName, String cfName, boolean 
announceLocally) throws ConfigurationException
+    {
+        TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName);
+        if (tm == null)
+            throw new ConfigurationException(String.format("Cannot drop non 
existing table '%s' in keyspace '%s'.", cfName, ksName));
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
+
+        logger.info("Drop table '{}/{}'", tm.keyspace, tm.name);
+        announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceViewDrop(String ksName, String viewName, 
boolean announceLocally) throws ConfigurationException
+    {
+        ViewMetadata view = Schema.instance.getView(ksName, viewName);
+        if (view == null)
+            throw new ConfigurationException(String.format("Cannot drop non 
existing materialized view '%s' in keyspace '%s'.", viewName, ksName));
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
+
+        logger.info("Drop table '{}/{}'", view.keyspace, view.name);
+        announce(SchemaKeyspace.makeDropViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceTypeDrop(UserType droppedType, boolean 
announceLocally)
+    {
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(droppedType.keyspace);
+        announce(SchemaKeyspace.dropTypeFromSchemaMutation(ksm, droppedType, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceFunctionDrop(UDFunction udf, boolean 
announceLocally)
+    {
+        logger.info("Drop scalar function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
+        announce(SchemaKeyspace.makeDropFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceAggregateDrop(UDAggregate udf, boolean 
announceLocally)
+    {
+        logger.info("Drop aggregate function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
+        announce(SchemaKeyspace.makeDropAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    /**
+     * actively announce a new version to active hosts via rpc
+     * @param schema The schema mutation to be applied
+     */
+    private static void announce(Mutation.SimpleBuilder schema, boolean 
announceLocally)
+    {
+        List<Mutation> mutations = Collections.singletonList(schema.build());
+
+        if (announceLocally)
+            Schema.instance.merge(mutations);
+        else
+            FBUtilities.waitOnFuture(announce(mutations));
+    }
+
+    private static void pushSchemaMutation(InetAddress endpoint, 
Collection<Mutation> schema)
+    {
+        MessageOut<Collection<Mutation>> msg = new 
MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
+                                                                schema,
+                                                                
MigrationsSerializer.instance);
+        MessagingService.instance().sendOneWay(msg, endpoint);
+    }
+
+    // Returns a future on the local application of the schema
+    private static Future<?> announce(final Collection<Mutation> schema)
+    {
+        Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new 
WrappedRunnable()
+        {
+            protected void runMayThrow() throws ConfigurationException
+            {
+                Schema.instance.mergeAndAnnounceVersion(schema);
+            }
+        });
+
+        for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
+        {
+            // only push schema to nodes with known and equal versions
+            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
+                    MessagingService.instance().knowsVersion(endpoint) &&
+                    MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version)
+                pushSchemaMutation(endpoint, schema);
+        }
+
+        return f;
+    }
+
+    /**
+     * Announce my version passively over gossip.
+     * Used to notify nodes as they arrive in the cluster.
+     *
+     * @param version The schema version to announce
+     */
+    static void passiveAnnounce(UUID version)
+    {
+        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(version));
+        logger.debug("Gossiping my schema version {}", version);
+    }
+
+    /**
+     * Clear all locally stored schema information and reset schema to initial 
state.
+     * Called by user (via JMX) who wants to get rid of schema disagreement.
+     */
+    public static void resetLocalSchema()
+    {
+        logger.info("Starting local schema reset...");
+
+        logger.debug("Truncating schema tables...");
+
+        SchemaKeyspace.truncate();
+
+        logger.debug("Clearing local schema keyspace definitions...");
+
+        Schema.instance.clear();
+
+        Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
+        liveEndpoints.remove(FBUtilities.getBroadcastAddress());
+
+        // force migration if there are nodes around
+        for (InetAddress node : liveEndpoints)
+        {
+            if (shouldPullSchemaFrom(node))
+            {
+                logger.debug("Requesting schema from {}", node);
+                FBUtilities.waitOnFuture(submitMigrationTask(node));
+                break;
+            }
+        }
+
+        logger.info("Local schema reset is complete.");
+    }
+
+    public static class MigrationsSerializer implements 
IVersionedSerializer<Collection<Mutation>>
+    {
+        public static MigrationsSerializer instance = new 
MigrationsSerializer();
+
+        public void serialize(Collection<Mutation> schema, DataOutputPlus out, 
int version) throws IOException
+        {
+            out.writeInt(schema.size());
+            for (Mutation mutation : schema)
+                Mutation.serializer.serialize(mutation, out, version);
+        }
+
+        public Collection<Mutation> deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            int count = in.readInt();
+            Collection<Mutation> schema = new ArrayList<>(count);
+
+            for (int i = 0; i < count; i++)
+                schema.add(Mutation.serializer.deserialize(in, version));
+
+            return schema;
+        }
+
+        public long serializedSize(Collection<Mutation> schema, int version)
+        {
+            int size = TypeSizes.sizeof(schema.size());
+            for (Mutation mutation : schema)
+                size += Mutation.serializer.serializedSize(mutation, version);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java 
b/src/java/org/apache/cassandra/schema/MigrationTask.java
new file mode 100644
index 0000000..a785e17
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/MigrationTask.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+final class MigrationTask extends WrappedRunnable
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MigrationTask.class);
+
+    private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = 
new ConcurrentLinkedQueue<>();
+
+    private static final Set<BootstrapState> monitoringBootstrapStates = 
EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
+
+    private final InetAddress endpoint;
+
+    MigrationTask(InetAddress endpoint)
+    {
+        this.endpoint = endpoint;
+    }
+
+    static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
+    {
+        return inflightTasks;
+    }
+
+    public void runMayThrow() throws Exception
+    {
+        // There is a chance that quite some time could have passed between 
now and the MM#maybeScheduleSchemaPull(),
+        // potentially enough for the endpoint node to restart - which is an 
issue if it does restart upgraded, with
+        // a higher major.
+        if (!MigrationManager.shouldPullSchemaFrom(endpoint))
+        {
+            logger.info("Skipped sending a migration request: node {} has a 
higher major version now.", endpoint);
+            return;
+        }
+
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            logger.debug("Can't send schema pull request: node {} is down.", 
endpoint);
+            return;
+        }
+
+        MessageOut message = new 
MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, 
MigrationManager.MigrationsSerializer.instance);
+
+        final CountDownLatch completionLatch = new CountDownLatch(1);
+
+        IAsyncCallback<Collection<Mutation>> cb = new 
IAsyncCallback<Collection<Mutation>>()
+        {
+            @Override
+            public void response(MessageIn<Collection<Mutation>> message)
+            {
+                try
+                {
+                    Schema.instance.mergeAndAnnounceVersion(message.payload);
+                }
+                catch (ConfigurationException e)
+                {
+                    logger.error("Configuration exception merging remote 
schema", e);
+                }
+                finally
+                {
+                    completionLatch.countDown();
+                }
+            }
+
+            public boolean isLatencyForSnitch()
+            {
+                return false;
+            }
+        };
+
+        // Only save the latches if we need bootstrap or are bootstrapping
+        if 
(monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
+            inflightTasks.offer(completionLatch);
+
+        MessagingService.instance().sendRR(message, endpoint, cb);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java 
b/src/java/org/apache/cassandra/schema/Schema.java
new file mode 100644
index 0000000..5786804
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -0,0 +1,804 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.KeyspaceNotDefinedException;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnknownTableException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.utils.Pair;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import static java.lang.String.format;
+
+import static com.google.common.collect.Iterables.size;
+
+public final class Schema
+{
+    public static final Schema instance = new Schema();
+
+    private volatile Keyspaces keyspaces = Keyspaces.none();
+
+    // UUID -> mutable metadata ref map. We have to update these in place 
every time a table changes.
+    private final Map<TableId, TableMetadataRef> metadataRefs = new 
NonBlockingHashMap<>();
+
+    // (keyspace name, index name) -> mutable metadata ref map. We have to 
update these in place every time an index changes.
+    private final Map<Pair<String, String>, TableMetadataRef> 
indexMetadataRefs = new NonBlockingHashMap<>();
+
+    // Keyspace objects, one per keyspace. Only one instance should ever exist 
for any given keyspace.
+    private final Map<String, Keyspace> keyspaceInstances = new 
NonBlockingHashMap<>();
+
+    private volatile UUID version;
+
+    private final List<SchemaChangeListener> changeListeners = new 
CopyOnWriteArrayList<>();
+
+    /**
+     * Initialize empty schema object and load the hardcoded system tables
+     */
+    private Schema()
+    {
+        if (DatabaseDescriptor.isDaemonInitialized() || 
DatabaseDescriptor.isToolInitialized())
+        {
+            load(SchemaKeyspace.metadata());
+            load(SystemKeyspace.metadata());
+        }
+    }
+
+    /**
+     * Validates that the provided keyspace is not one of the system keyspace.
+     *
+     * @param keyspace the keyspace name to validate.
+     *
+     * @throws InvalidRequestException if {@code keyspace} is the name of a
+     * system keyspace.
+     */
+    public static void validateKeyspaceNotSystem(String keyspace)
+    {
+        if (SchemaConstants.isSystemKeyspace(keyspace))
+            throw new InvalidRequestException(format("%s keyspace is not 
user-modifiable", keyspace));
+    }
+
+    /**
+     * load keyspace (keyspace) definitions, but do not initialize the 
keyspace instances.
+     * Schema version may be updated as the result.
+     */
+    public void loadFromDisk()
+    {
+        loadFromDisk(true);
+    }
+
+    /**
+     * Load schema definitions from disk.
+     *
+     * @param updateVersion true if schema version needs to be updated
+     */
+    public void loadFromDisk(boolean updateVersion)
+    {
+        load(SchemaKeyspace.fetchNonSystemKeyspaces());
+        if (updateVersion)
+            updateVersion();
+    }
+
+    /**
+     * Load up non-system keyspaces
+     *
+     * @param keyspaceDefs The non-system keyspace definitions
+     */
+    private void load(Iterable<KeyspaceMetadata> keyspaceDefs)
+    {
+        keyspaceDefs.forEach(this::load);
+    }
+
+    /**
+     * Update (or insert) new keyspace definition
+     *
+     * @param ksm The metadata about keyspace
+     */
+    synchronized public void load(KeyspaceMetadata ksm)
+    {
+        KeyspaceMetadata previous = keyspaces.getNullable(ksm.name);
+
+        if (previous == null)
+            loadNew(ksm);
+        else
+            reload(previous, ksm);
+
+        keyspaces = keyspaces.withAddedOrUpdated(ksm);
+    }
+
+    private void loadNew(KeyspaceMetadata ksm)
+    {
+        ksm.tablesAndViews()
+           .forEach(metadata -> metadataRefs.put(metadata.id, new 
TableMetadataRef(metadata)));
+
+        ksm.tables
+           .indexTables()
+           .forEach((name, metadata) -> 
indexMetadataRefs.put(Pair.create(ksm.name, name), new 
TableMetadataRef(metadata)));
+    }
+
+    private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated)
+    {
+        Keyspace keyspace = getKeyspaceInstance(updated.name);
+        if (keyspace != null)
+            keyspace.setMetadata(updated);
+
+        MapDifference<TableId, TableMetadata> tablesDiff = 
previous.tables.diff(updated.tables);
+        MapDifference<TableId, ViewMetadata> viewsDiff = 
previous.views.diff(updated.views);
+        MapDifference<String, TableMetadata> indexesDiff = 
previous.tables.indexesDiff(updated.tables);
+
+        // clean up after removed entries
+
+        tablesDiff.entriesOnlyOnLeft()
+                  .values()
+                  .forEach(table -> metadataRefs.remove(table.id));
+
+        viewsDiff.entriesOnlyOnLeft()
+                 .values()
+                 .forEach(view -> metadataRefs.remove(view.metadata.id));
+
+        indexesDiff.entriesOnlyOnLeft()
+                   .values()
+                   .forEach(indexTable -> 
indexMetadataRefs.remove(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())));
+
+        // load up new entries
+
+        tablesDiff.entriesOnlyOnRight()
+                  .values()
+                  .forEach(table -> metadataRefs.put(table.id, new 
TableMetadataRef(table)));
+
+        viewsDiff.entriesOnlyOnRight()
+                 .values()
+                 .forEach(view -> metadataRefs.put(view.metadata.id, new 
TableMetadataRef(view.metadata)));
+
+        indexesDiff.entriesOnlyOnRight()
+                   .values()
+                   .forEach(indexTable -> 
indexMetadataRefs.put(Pair.create(indexTable.keyspace, 
indexTable.indexName().get()), new TableMetadataRef(indexTable)));
+
+        // refresh refs to updated ones
+
+        tablesDiff.entriesDiffering()
+                  .values()
+                  .forEach(diff -> 
metadataRefs.get(diff.rightValue().id).set(diff.rightValue()));
+
+        viewsDiff.entriesDiffering()
+                 .values()
+                 .forEach(diff -> 
metadataRefs.get(diff.rightValue().metadata.id).set(diff.rightValue().metadata));
+
+        indexesDiff.entriesDiffering()
+                   .values()
+                   .stream()
+                   .map(MapDifference.ValueDifference::rightValue)
+                   .forEach(indexTable -> 
indexMetadataRefs.get(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())).set(indexTable));
+    }
+
+    public void registerListener(SchemaChangeListener listener)
+    {
+        changeListeners.add(listener);
+    }
+
+    @SuppressWarnings("unused")
+    public void unregisterListener(SchemaChangeListener listener)
+    {
+        changeListeners.remove(listener);
+    }
+
+    /**
+     * Get keyspace instance by name
+     *
+     * @param keyspaceName The name of the keyspace
+     *
+     * @return Keyspace object or null if keyspace was not found
+     */
+    public Keyspace getKeyspaceInstance(String keyspaceName)
+    {
+        return keyspaceInstances.get(keyspaceName);
+    }
+
+    public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id)
+    {
+        TableMetadata metadata = getTableMetadata(id);
+        if (metadata == null)
+            return null;
+
+        Keyspace instance = getKeyspaceInstance(metadata.keyspace);
+        if (instance == null)
+            return null;
+
+        return instance.hasColumnFamilyStore(metadata.id)
+             ? instance.getColumnFamilyStore(metadata.id)
+             : null;
+    }
+
+    /**
+     * Store given Keyspace instance to the schema
+     *
+     * @param keyspace The Keyspace instance to store
+     *
+     * @throws IllegalArgumentException if Keyspace is already stored
+     */
+    public void storeKeyspaceInstance(Keyspace keyspace)
+    {
+        if (keyspaceInstances.containsKey(keyspace.getName()))
+            throw new IllegalArgumentException(String.format("Keyspace %s was 
already initialized.", keyspace.getName()));
+
+        keyspaceInstances.put(keyspace.getName(), keyspace);
+    }
+
+    /**
+     * Remove keyspace from schema
+     *
+     * @param keyspaceName The name of the keyspace to remove
+     *
+     * @return removed keyspace instance or null if it wasn't found
+     */
+    public Keyspace removeKeyspaceInstance(String keyspaceName)
+    {
+        return keyspaceInstances.remove(keyspaceName);
+    }
+
+    /**
+     * Remove keyspace definition from system
+     *
+     * @param ksm The keyspace definition to remove
+     */
+    synchronized void unload(KeyspaceMetadata ksm)
+    {
+        keyspaces = keyspaces.without(ksm.name);
+
+        ksm.tablesAndViews()
+           .forEach(t -> metadataRefs.remove(t.id));
+
+        ksm.tables
+           .indexTables()
+           .keySet()
+           .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, 
name)));
+    }
+
+    public int getNumberOfTables()
+    {
+        return keyspaces.stream().mapToInt(k -> 
size(k.tablesAndViews())).sum();
+    }
+
+    public ViewMetadata getView(String keyspaceName, String viewName)
+    {
+        assert keyspaceName != null;
+        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
+        return (ksm == null) ? null : ksm.views.getNullable(viewName);
+    }
+
+    /**
+     * Get metadata about keyspace by its name
+     *
+     * @param keyspaceName The name of the keyspace
+     *
+     * @return The keyspace metadata or null if it wasn't found
+     */
+    public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName)
+    {
+        assert keyspaceName != null;
+        return keyspaces.getNullable(keyspaceName);
+    }
+
+    private Set<String> getNonSystemKeyspacesSet()
+    {
+        return Sets.difference(keyspaces.names(), 
SchemaConstants.SYSTEM_KEYSPACE_NAMES);
+    }
+
+    /**
+     * @return collection of the non-system keyspaces (note that this count as 
system only the
+     * non replicated keyspaces, so keyspace like system_traces which are 
replicated are actually
+     * returned. See getUserKeyspace() below if you don't want those)
+     */
+    public ImmutableList<String> getNonSystemKeyspaces()
+    {
+        return ImmutableList.copyOf(getNonSystemKeyspacesSet());
+    }
+
+    /**
+     * @return a collection of keyspaces that do not use LocalStrategy for 
replication
+     */
+    public List<String> getNonLocalStrategyKeyspaces()
+    {
+        return keyspaces.stream()
+                        .filter(keyspace -> keyspace.params.replication.klass 
!= LocalStrategy.class)
+                        .map(keyspace -> keyspace.name)
+                        .collect(Collectors.toList());
+    }
+
+    /**
+     * @return collection of the user defined keyspaces
+     */
+    public List<String> getUserKeyspaces()
+    {
+        return 
ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), 
SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES));
+    }
+
+    /**
+     * Get metadata about keyspace inner ColumnFamilies
+     *
+     * @param keyspaceName The name of the keyspace
+     *
+     * @return metadata about ColumnFamilies the belong to the given keyspace
+     */
+    public Iterable<TableMetadata> getTablesAndViews(String keyspaceName)
+    {
+        assert keyspaceName != null;
+        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
+        assert ksm != null;
+        return ksm.tablesAndViews();
+    }
+
+    /**
+     * @return collection of the all keyspace names registered in the system 
(system and non-system)
+     */
+    public Set<String> getKeyspaces()
+    {
+        return keyspaces.names();
+    }
+
+    /* TableMetadata/Ref query/control methods */
+
+    /**
+     * Given a keyspace name and table/view name, get the table metadata
+     * reference. If the keyspace name or table/view name is not present
+     * this method returns null.
+     *
+     * @return TableMetadataRef object or null if it wasn't found
+     */
+    public TableMetadataRef getTableMetadataRef(String keyspace, String table)
+    {
+        TableMetadata tm = getTableMetadata(keyspace, table);
+        return tm == null
+             ? null
+             : metadataRefs.get(tm.id);
+    }
+
+    public TableMetadataRef getIndexTableMetadataRef(String keyspace, String 
index)
+    {
+        return indexMetadataRefs.get(Pair.create(keyspace, index));
+    }
+
+    /**
+     * Get Table metadata by its identifier
+     *
+     * @param id table or view identifier
+     *
+     * @return metadata about Table or View
+     */
+    public TableMetadataRef getTableMetadataRef(TableId id)
+    {
+        return metadataRefs.get(id);
+    }
+
+    public TableMetadataRef getTableMetadataRef(Descriptor descriptor)
+    {
+        return getTableMetadataRef(descriptor.ksname, descriptor.cfname);
+    }
+
+    /**
+     * Given a keyspace name and table name, get the table
+     * meta data. If the keyspace name or table name is not valid
+     * this function returns null.
+     *
+     * @param keyspace The keyspace name
+     * @param table The table name
+     *
+     * @return TableMetadata object or null if it wasn't found
+     */
+    public TableMetadata getTableMetadata(String keyspace, String table)
+    {
+        assert keyspace != null;
+        assert table != null;
+
+        KeyspaceMetadata ksm = keyspaces.getNullable(keyspace);
+        return ksm == null
+             ? null
+             : ksm.getTableOrViewNullable(table);
+    }
+
+    public TableMetadata getTableMetadata(TableId id)
+    {
+        return keyspaces.getTableOrViewNullable(id);
+    }
+
+    public TableMetadata validateTable(String keyspaceName, String tableName)
+    {
+        if (tableName.isEmpty())
+            throw new InvalidRequestException("non-empty table is required");
+
+        KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
+        if (keyspace == null)
+            throw new KeyspaceNotDefinedException(format("keyspace %s does not 
exist", keyspaceName));
+
+        TableMetadata metadata = keyspace.getTableOrViewNullable(tableName);
+        if (metadata == null)
+            throw new InvalidRequestException(format("table %s does not 
exist", tableName));
+
+        return metadata;
+    }
+
+    public TableMetadata getTableMetadata(Descriptor descriptor)
+    {
+        return getTableMetadata(descriptor.ksname, descriptor.cfname);
+    }
+
+    /**
+     * @throws UnknownTableException if the table couldn't be found in the 
metadata
+     */
+    public TableMetadata getExistingTableMetadata(TableId id) throws 
UnknownTableException
+    {
+        TableMetadata metadata = getTableMetadata(id);
+        if (metadata != null)
+            return metadata;
+
+        String message =
+            String.format("Couldn't find table with id %s. If a table was just 
created, this is likely due to the schema"
+                          + "not being fully propagated.  Please wait for 
schema agreement on table creation.",
+                          id);
+        throw new UnknownTableException(message, id);
+    }
+
+    /* Function helpers */
+
+    /**
+     * Get all function overloads with the specified name
+     *
+     * @param name fully qualified function name
+     * @return an empty list if the keyspace or the function name are not 
found;
+     *         a non-empty collection of {@link Function} otherwise
+     */
+    public Collection<Function> getFunctions(FunctionName name)
+    {
+        if (!name.hasKeyspace())
+            throw new IllegalArgumentException(String.format("Function name 
must be fully quallified: got %s", name));
+
+        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
+        return ksm == null
+             ? Collections.emptyList()
+             : ksm.functions.get(name);
+    }
+
+    /**
+     * Find the function with the specified name
+     *
+     * @param name fully qualified function name
+     * @param argTypes function argument types
+     * @return an empty {@link Optional} if the keyspace or the function name 
are not found;
+     *         a non-empty optional of {@link Function} otherwise
+     */
+    public Optional<Function> findFunction(FunctionName name, 
List<AbstractType<?>> argTypes)
+    {
+        if (!name.hasKeyspace())
+            throw new IllegalArgumentException(String.format("Function name 
must be fully quallified: got %s", name));
+
+        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
+        return ksm == null
+             ? Optional.empty()
+             : ksm.functions.find(name, argTypes);
+    }
+
+    /* Version control */
+
+    /**
+     * @return current schema version
+     */
+    public UUID getVersion()
+    {
+        return version;
+    }
+
+    /**
+     * Read schema from system keyspace and calculate MD5 digest of every row, 
resulting digest
+     * will be converted into UUID which would act as content-based version of 
the schema.
+     */
+    public void updateVersion()
+    {
+        version = SchemaKeyspace.calculateSchemaDigest();
+        SystemKeyspace.updateSchemaVersion(version);
+    }
+
+    /*
+     * Like updateVersion, but also announces via gossip
+     */
+    public void updateVersionAndAnnounce()
+    {
+        updateVersion();
+        MigrationManager.passiveAnnounce(version);
+    }
+
+    /**
+     * Clear all KS/CF metadata and reset version.
+     */
+    public synchronized void clear()
+    {
+        getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k)));
+        updateVersionAndAnnounce();
+    }
+
+    /**
+     * Merge remote schema in form of mutations with local and mutate ks/cf 
metadata objects
+     * (which also involves fs operations on add/drop ks/cf)
+     *
+     * @param mutations the schema changes to apply
+     *
+     * @throws ConfigurationException If one of metadata attributes has 
invalid value
+     */
+    synchronized void mergeAndAnnounceVersion(Collection<Mutation> mutations)
+    {
+        merge(mutations);
+        updateVersionAndAnnounce();
+    }
+
+    synchronized void merge(Collection<Mutation> mutations)
+    {
+        // only compare the keyspaces affected by this set of schema mutations
+        Set<String> affectedKeyspaces = 
SchemaKeyspace.affectedKeyspaces(mutations);
+
+        // fetch the current state of schema for the affected keyspaces only
+        Keyspaces before = keyspaces.filter(k -> 
affectedKeyspaces.contains(k.name));
+
+        // apply the schema mutations
+        SchemaKeyspace.applyChanges(mutations);
+
+        // apply the schema mutations and fetch the new versions of the 
altered keyspaces
+        Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
+
+        MapDifference<String, KeyspaceMetadata> keyspacesDiff = 
before.diff(after);
+
+        // dropped keyspaces
+        keyspacesDiff.entriesOnlyOnLeft().values().forEach(this::dropKeyspace);
+
+        // new keyspaces
+        
keyspacesDiff.entriesOnlyOnRight().values().forEach(this::createKeyspace);
+
+        // updated keyspaces
+        keyspacesDiff.entriesDiffering().entrySet().forEach(diff -> 
alterKeyspace(diff.getValue().leftValue(), diff.getValue().rightValue()));
+    }
+
+    private void alterKeyspace(KeyspaceMetadata before, KeyspaceMetadata after)
+    {
+        // calculate the deltas
+        MapDifference<TableId, TableMetadata> tablesDiff = 
before.tables.diff(after.tables);
+        MapDifference<TableId, ViewMetadata> viewsDiff = 
before.views.diff(after.views);
+        MapDifference<ByteBuffer, UserType> typesDiff = 
before.types.diff(after.types);
+        MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = 
before.functions.udfsDiff(after.functions);
+        MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff 
= before.functions.udasDiff(after.functions);
+
+        // drop tables and views
+        viewsDiff.entriesOnlyOnLeft().values().forEach(this::dropView);
+        tablesDiff.entriesOnlyOnLeft().values().forEach(this::dropTable);
+
+        load(after);
+
+        // add tables and views
+        tablesDiff.entriesOnlyOnRight().values().forEach(this::createTable);
+        viewsDiff.entriesOnlyOnRight().values().forEach(this::createView);
+
+        // update tables and views
+        tablesDiff.entriesDiffering().values().forEach(diff -> 
alterTable(diff.rightValue()));
+        viewsDiff.entriesDiffering().values().forEach(diff -> 
alterView(diff.rightValue()));
+
+        // deal with all removed, added, and altered views
+        Keyspace.open(before.name).viewManager.reload();
+
+        // notify on everything dropped
+        
udasDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropAggregate);
+        
udfsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropFunction);
+        viewsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropView);
+        tablesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropTable);
+        typesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropType);
+
+        // notify on everything created
+        
typesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateType);
+        
tablesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateTable);
+        
viewsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateView);
+        
udfsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateFunction);
+        
udasDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateAggregate);
+
+        // notify on everything altered
+        if (!before.params.equals(after.params))
+            notifyAlterKeyspace(after);
+        typesDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterType(diff.rightValue()));
+        tablesDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterTable(diff.leftValue(), diff.rightValue()));
+        viewsDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterView(diff.leftValue(), diff.rightValue()));
+        udfsDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterFunction(diff.rightValue()));
+        udasDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterAggregate(diff.rightValue()));
+    }
+
+    private void createKeyspace(KeyspaceMetadata keyspace)
+    {
+        load(keyspace);
+        Keyspace.open(keyspace.name);
+
+        notifyCreateKeyspace(keyspace);
+        keyspace.types.forEach(this::notifyCreateType);
+        keyspace.tables.forEach(this::notifyCreateTable);
+        keyspace.views.forEach(this::notifyCreateView);
+        keyspace.functions.udfs().forEach(this::notifyCreateFunction);
+        keyspace.functions.udas().forEach(this::notifyCreateAggregate);
+    }
+
+    private void dropKeyspace(KeyspaceMetadata keyspace)
+    {
+        keyspace.views.forEach(this::dropView);
+        keyspace.tables.forEach(this::dropTable);
+
+        // remove the keyspace from the static instances.
+        Keyspace.clear(keyspace.name);
+        unload(keyspace);
+        Keyspace.writeOrder.awaitNewBarrier();
+
+        keyspace.functions.udas().forEach(this::notifyDropAggregate);
+        keyspace.functions.udfs().forEach(this::notifyDropFunction);
+        keyspace.views.forEach(this::notifyDropView);
+        keyspace.tables.forEach(this::notifyDropTable);
+        keyspace.types.forEach(this::notifyDropType);
+        notifyDropKeyspace(keyspace);
+    }
+
+    private void dropView(ViewMetadata metadata)
+    {
+        dropTable(metadata.metadata);
+    }
+
+    private void dropTable(TableMetadata metadata)
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
+        assert cfs != null;
+        // make sure all the indexes are dropped, or else.
+        cfs.indexManager.markAllIndexesRemoved();
+        
CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata),
 true);
+        if (DatabaseDescriptor.isAutoSnapshot())
+            
cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, 
ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
+        
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
+        Keyspace.open(metadata.keyspace).dropCf(metadata.id);
+    }
+
+    private void createTable(TableMetadata table)
+    {
+        Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), true);
+    }
+
+    private void createView(ViewMetadata view)
+    {
+        
Keyspace.open(view.keyspace).initCf(metadataRefs.get(view.metadata.id), true);
+    }
+
+    private void alterTable(TableMetadata updated)
+    {
+        
Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
+    }
+
+    private void alterView(ViewMetadata updated)
+    {
+        
Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
+    }
+
+    private void notifyCreateKeyspace(KeyspaceMetadata ksm)
+    {
+        changeListeners.forEach(l -> l.onCreateKeyspace(ksm.name));
+    }
+
+    private void notifyCreateTable(TableMetadata metadata)
+    {
+        changeListeners.forEach(l -> l.onCreateTable(metadata.keyspace, 
metadata.name));
+    }
+
+    private void notifyCreateView(ViewMetadata view)
+    {
+        changeListeners.forEach(l -> l.onCreateView(view.keyspace, view.name));
+    }
+
+    private void notifyCreateType(UserType ut)
+    {
+        changeListeners.forEach(l -> l.onCreateType(ut.keyspace, 
ut.getNameAsString()));
+    }
+
+    private void notifyCreateFunction(UDFunction udf)
+    {
+        changeListeners.forEach(l -> l.onCreateFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
+    }
+
+    private void notifyCreateAggregate(UDAggregate udf)
+    {
+        changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
+    }
+
+    private void notifyAlterKeyspace(KeyspaceMetadata ksm)
+    {
+        changeListeners.forEach(l -> l.onAlterKeyspace(ksm.name));
+    }
+
+    private void notifyAlterTable(TableMetadata current, TableMetadata updated)
+    {
+        boolean changeAffectedPreparedStatements = 
current.changeAffectsPreparedStatements(updated);
+        changeListeners.forEach(l -> l.onAlterTable(updated.keyspace, 
updated.name, changeAffectedPreparedStatements));
+    }
+
+    private void notifyAlterView(ViewMetadata current, ViewMetadata updated)
+    {
+        boolean changeAffectedPreparedStatements = 
current.metadata.changeAffectsPreparedStatements(updated.metadata);
+        changeListeners.forEach(l ->l.onAlterView(updated.keyspace, 
updated.name, changeAffectedPreparedStatements));
+    }
+
+    private void notifyAlterType(UserType ut)
+    {
+        changeListeners.forEach(l -> l.onAlterType(ut.keyspace, 
ut.getNameAsString()));
+    }
+
+    private void notifyAlterFunction(UDFunction udf)
+    {
+        changeListeners.forEach(l -> l.onAlterFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
+    }
+
+    private void notifyAlterAggregate(UDAggregate udf)
+    {
+        changeListeners.forEach(l -> l.onAlterAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
+    }
+
+    private void notifyDropKeyspace(KeyspaceMetadata ksm)
+    {
+        changeListeners.forEach(l -> l.onDropKeyspace(ksm.name));
+    }
+
+    private void notifyDropTable(TableMetadata metadata)
+    {
+        changeListeners.forEach(l -> l.onDropTable(metadata.keyspace, 
metadata.name));
+    }
+
+    private void notifyDropView(ViewMetadata view)
+    {
+        changeListeners.forEach(l -> l.onDropView(view.keyspace, view.name));
+    }
+
+    private void notifyDropType(UserType ut)
+    {
+        changeListeners.forEach(l -> l.onDropType(ut.keyspace, 
ut.getNameAsString()));
+    }
+
+    private void notifyDropFunction(UDFunction udf)
+    {
+        changeListeners.forEach(l -> l.onDropFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
+    }
+
+    private void notifyDropAggregate(UDAggregate udf)
+    {
+        changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaChangeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaChangeListener.java 
b/src/java/org/apache/cassandra/schema/SchemaChangeListener.java
new file mode 100644
index 0000000..4390309
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaChangeListener.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class SchemaChangeListener
+{
+    public void onCreateKeyspace(String keyspace)
+    {
+    }
+
+    public void onCreateTable(String keyspace, String table)
+    {
+    }
+
+    public void onCreateView(String keyspace, String view)
+    {
+        onCreateTable(keyspace, view);
+    }
+
+    public void onCreateType(String keyspace, String type)
+    {
+    }
+
+    public void onCreateFunction(String keyspace, String function, 
List<AbstractType<?>> argumentTypes)
+    {
+    }
+
+    public void onCreateAggregate(String keyspace, String aggregate, 
List<AbstractType<?>> argumentTypes)
+    {
+    }
+
+    public void onAlterKeyspace(String keyspace)
+    {
+    }
+
+    // the boolean flag indicates whether the change that triggered this event 
may have a substantive
+    // impact on statements using the column family.
+    public void onAlterTable(String keyspace, String table, boolean 
affectsStatements)
+    {
+    }
+
+    public void onAlterView(String keyspace, String view, boolean 
affectsStataments)
+    {
+        onAlterTable(keyspace, view, affectsStataments);
+    }
+
+    public void onAlterType(String keyspace, String type)
+    {
+    }
+
+    public void onAlterFunction(String keyspace, String function, 
List<AbstractType<?>> argumentTypes)
+    {
+    }
+
+    public void onAlterAggregate(String keyspace, String aggregate, 
List<AbstractType<?>> argumentTypes)
+    {
+    }
+
+    public void onDropKeyspace(String keyspace)
+    {
+    }
+
+    public void onDropTable(String keyspace, String table)
+    {
+    }
+
+    public void onDropView(String keyspace, String view)
+    {
+        onDropTable(keyspace, view);
+    }
+
+    public void onDropType(String keyspace, String type)
+    {
+    }
+
+    public void onDropFunction(String keyspace, String function, 
List<AbstractType<?>> argumentTypes)
+    {
+    }
+
+    public void onDropAggregate(String keyspace, String aggregate, 
List<AbstractType<?>> argumentTypes)
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java 
b/src/java/org/apache/cassandra/schema/SchemaConstants.java
new file mode 100644
index 0000000..5340f69
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.ImmutableSet;
+
+public final class SchemaConstants
+{
+    public static final Pattern PATTERN_WORD_CHARS = Pattern.compile("\\w+");
+
+    public static final String SYSTEM_KEYSPACE_NAME = "system";
+    public static final String SCHEMA_KEYSPACE_NAME = "system_schema";
+
+    public static final String TRACE_KEYSPACE_NAME = "system_traces";
+    public static final String AUTH_KEYSPACE_NAME = "system_auth";
+    public static final String DISTRIBUTED_KEYSPACE_NAME = 
"system_distributed";
+
+    /* system keyspace names (the ones with LocalStrategy replication 
strategy) */
+    public static final Set<String> SYSTEM_KEYSPACE_NAMES = 
ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
+
+    /* replicate system keyspace names (the ones with a "true" replication 
strategy) */
+    public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = 
ImmutableSet.of(TRACE_KEYSPACE_NAME,
+                                                                               
        AUTH_KEYSPACE_NAME,
+                                                                               
        DISTRIBUTED_KEYSPACE_NAME);
+    /**
+     * longest permissible KS or CF name.  Our main concern is that filename 
not be more than 255 characters;
+     * the filename will contain both the KS and CF names. Since 
non-schema-name components only take up
+     * ~64 characters, we could allow longer names than this, but on Windows, 
the entire path should be not greater than
+     * 255 characters, so a lower limit here helps avoid problems.  See 
CASSANDRA-4110.
+     */
+    public static final int NAME_LENGTH = 48;
+
+    // 59adb24e-f3cd-3e02-97f0-5b395827453f
+    public static final UUID emptyVersion;
+
+    public static boolean isValidName(String name)
+    {
+        return name != null && !name.isEmpty() && name.length() <= NAME_LENGTH 
&& PATTERN_WORD_CHARS.matcher(name).matches();
+    }
+
+    static
+    {
+        try
+        {
+            emptyVersion = 
UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest());
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            throw new AssertionError();
+        }
+    }
+
+    /**
+     * @return whether or not the keyspace is a really system one (w/ 
LocalStrategy, unmodifiable, hardcoded)
+     */
+    public static boolean isSystemKeyspace(String keyspaceName)
+    {
+        return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
+    }
+}

Reply via email to