Merge branch 'cassandra-3.11' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8406b206
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8406b206
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8406b206

Branch: refs/heads/trunk
Commit: 8406b206a6416c36d924eb7accdff7dbb4b7173b
Parents: e38fddf f5e8d16
Author: Aleksey Yeschenko <alek...@yeschenko.com>
Authored: Tue Oct 17 13:40:24 2017 +0100
Committer: Aleksey Yeschenko <alek...@yeschenko.com>
Committed: Tue Oct 17 13:40:24 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/AlterKeyspaceStatement.java |  2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  2 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |  4 +--
 .../org/apache/cassandra/db/ReadCommand.java    |  2 +-
 .../io/sstable/format/SSTableReader.java        |  2 +-
 .../org/apache/cassandra/schema/Schema.java     |  6 ++--
 .../cassandra/schema/SchemaConstants.java       | 20 +++++++++----
 .../apache/cassandra/schema/SchemaKeyspace.java |  6 ++--
 .../org/apache/cassandra/schema/TableId.java    |  3 +-
 .../apache/cassandra/schema/TableMetadata.java  |  2 +-
 .../apache/cassandra/service/ClientState.java   | 31 +++++++++++---------
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../cassandra/service/StorageService.java       | 16 ++++++++--
 .../cassandra/tools/nodetool/Cleanup.java       |  2 +-
 15 files changed, 61 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index 0de5b2a,76c8d2f..b1ab138
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@@ -58,10 -54,10 +58,10 @@@ public class AlterKeyspaceStatement ext
  
      public void validate(ClientState state) throws RequestValidationException
      {
 -        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(name);
 +        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(name);
          if (ksm == null)
              throw new InvalidRequestException("Unknown keyspace " + name);
-         if (SchemaConstants.isSystemKeyspace(ksm.name))
+         if (SchemaConstants.isLocalSystemKeyspace(ksm.name))
              throw new InvalidRequestException("Cannot alter system keyspace");
  
          attrs.validate();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 67270ad,c9514ca..f095458
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1818,7 -1797,7 +1818,7 @@@ public class ColumnFamilyStore implemen
                  }
  
                  writeSnapshotManifest(filesJSONArr, snapshotName);
-                 if 
(!SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(metadata.keyspace) && 
!SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.keyspace))
 -                if (!SchemaConstants.isLocalSystemKeyspace(metadata.ksName) 
&& !SchemaConstants.isReplicatedSystemKeyspace(metadata.ksName))
++                if (!SchemaConstants.isLocalSystemKeyspace(metadata.keyspace) 
&& !SchemaConstants.isReplicatedSystemKeyspace(metadata.keyspace))
                      writeSnapshotSchema(snapshotName);
              }
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index f07f7f1,ab8779e..feaee7c
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -408,7 -464,7 +408,7 @@@ public abstract class ReadCommand exten
              private final int failureThreshold = 
DatabaseDescriptor.getTombstoneFailureThreshold();
              private final int warningThreshold = 
DatabaseDescriptor.getTombstoneWarnThreshold();
  
-             private final boolean respectTombstoneThresholds = 
!SchemaConstants.isSystemKeyspace(ReadCommand.this.metadata().keyspace);
 -            private final boolean respectTombstoneThresholds = 
!SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().ksName);
++            private final boolean respectTombstoneThresholds = 
!SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().keyspace);
              private final boolean enforceStrictLiveness = 
metadata.enforceStrictLiveness();
  
              private int liveRows = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/Schema.java
index 2319858,0000000..e79e3bd
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@@ -1,821 -1,0 +1,821 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.MapDifference;
 +import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.functions.*;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.KeyspaceNotDefinedException;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.UserType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.exceptions.UnknownTableException;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.locator.LocalStrategy;
 +import org.apache.cassandra.utils.Pair;
 +import org.cliffc.high_scale_lib.NonBlockingHashMap;
 +
 +import static java.lang.String.format;
 +
 +import static com.google.common.collect.Iterables.size;
 +
 +public final class Schema
 +{
 +    public static final Schema instance = new Schema();
 +
 +    private volatile Keyspaces keyspaces = Keyspaces.none();
 +
 +    // UUID -> mutable metadata ref map. We have to update these in place 
every time a table changes.
 +    private final Map<TableId, TableMetadataRef> metadataRefs = new 
NonBlockingHashMap<>();
 +
 +    // (keyspace name, index name) -> mutable metadata ref map. We have to 
update these in place every time an index changes.
 +    private final Map<Pair<String, String>, TableMetadataRef> 
indexMetadataRefs = new NonBlockingHashMap<>();
 +
 +    // Keyspace objects, one per keyspace. Only one instance should ever 
exist for any given keyspace.
 +    private final Map<String, Keyspace> keyspaceInstances = new 
NonBlockingHashMap<>();
 +
 +    private volatile UUID version;
 +
 +    private final List<SchemaChangeListener> changeListeners = new 
CopyOnWriteArrayList<>();
 +
 +    /**
 +     * Initialize empty schema object and load the hardcoded system tables
 +     */
 +    private Schema()
 +    {
 +        if (DatabaseDescriptor.isDaemonInitialized() || 
DatabaseDescriptor.isToolInitialized())
 +        {
 +            load(SchemaKeyspace.metadata());
 +            load(SystemKeyspace.metadata());
 +        }
 +    }
 +
 +    /**
 +     * Validates that the provided keyspace is not one of the system keyspace.
 +     *
 +     * @param keyspace the keyspace name to validate.
 +     *
 +     * @throws InvalidRequestException if {@code keyspace} is the name of a
 +     * system keyspace.
 +     */
 +    public static void validateKeyspaceNotSystem(String keyspace)
 +    {
-         if (SchemaConstants.isSystemKeyspace(keyspace))
++        if (SchemaConstants.isLocalSystemKeyspace(keyspace))
 +            throw new InvalidRequestException(format("%s keyspace is not 
user-modifiable", keyspace));
 +    }
 +
 +    /**
 +     * load keyspace (keyspace) definitions, but do not initialize the 
keyspace instances.
 +     * Schema version may be updated as the result.
 +     */
 +    public void loadFromDisk()
 +    {
 +        loadFromDisk(true);
 +    }
 +
 +    /**
 +     * Load schema definitions from disk.
 +     *
 +     * @param updateVersion true if schema version needs to be updated
 +     */
 +    public void loadFromDisk(boolean updateVersion)
 +    {
 +        load(SchemaKeyspace.fetchNonSystemKeyspaces());
 +        if (updateVersion)
 +            updateVersion();
 +    }
 +
 +    /**
 +     * Load up non-system keyspaces
 +     *
 +     * @param keyspaceDefs The non-system keyspace definitions
 +     */
 +    private void load(Iterable<KeyspaceMetadata> keyspaceDefs)
 +    {
 +        keyspaceDefs.forEach(this::load);
 +    }
 +
 +    /**
 +     * Update (or insert) new keyspace definition
 +     *
 +     * @param ksm The metadata about keyspace
 +     */
 +    synchronized public void load(KeyspaceMetadata ksm)
 +    {
 +        KeyspaceMetadata previous = keyspaces.getNullable(ksm.name);
 +
 +        if (previous == null)
 +            loadNew(ksm);
 +        else
 +            reload(previous, ksm);
 +
 +        keyspaces = keyspaces.withAddedOrUpdated(ksm);
 +    }
 +
 +    private void loadNew(KeyspaceMetadata ksm)
 +    {
 +        ksm.tablesAndViews()
 +           .forEach(metadata -> metadataRefs.put(metadata.id, new 
TableMetadataRef(metadata)));
 +
 +        ksm.tables
 +           .indexTables()
 +           .forEach((name, metadata) -> 
indexMetadataRefs.put(Pair.create(ksm.name, name), new 
TableMetadataRef(metadata)));
 +    }
 +
 +    private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated)
 +    {
 +        Keyspace keyspace = getKeyspaceInstance(updated.name);
 +        if (keyspace != null)
 +            keyspace.setMetadata(updated);
 +
 +        MapDifference<TableId, TableMetadata> tablesDiff = 
previous.tables.diff(updated.tables);
 +        MapDifference<TableId, ViewMetadata> viewsDiff = 
previous.views.diff(updated.views);
 +        MapDifference<String, TableMetadata> indexesDiff = 
previous.tables.indexesDiff(updated.tables);
 +
 +        // clean up after removed entries
 +
 +        tablesDiff.entriesOnlyOnLeft()
 +                  .values()
 +                  .forEach(table -> metadataRefs.remove(table.id));
 +
 +        viewsDiff.entriesOnlyOnLeft()
 +                 .values()
 +                 .forEach(view -> metadataRefs.remove(view.metadata.id));
 +
 +        indexesDiff.entriesOnlyOnLeft()
 +                   .values()
 +                   .forEach(indexTable -> 
indexMetadataRefs.remove(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())));
 +
 +        // load up new entries
 +
 +        tablesDiff.entriesOnlyOnRight()
 +                  .values()
 +                  .forEach(table -> metadataRefs.put(table.id, new 
TableMetadataRef(table)));
 +
 +        viewsDiff.entriesOnlyOnRight()
 +                 .values()
 +                 .forEach(view -> metadataRefs.put(view.metadata.id, new 
TableMetadataRef(view.metadata)));
 +
 +        indexesDiff.entriesOnlyOnRight()
 +                   .values()
 +                   .forEach(indexTable -> 
indexMetadataRefs.put(Pair.create(indexTable.keyspace, 
indexTable.indexName().get()), new TableMetadataRef(indexTable)));
 +
 +        // refresh refs to updated ones
 +
 +        tablesDiff.entriesDiffering()
 +                  .values()
 +                  .forEach(diff -> 
metadataRefs.get(diff.rightValue().id).set(diff.rightValue()));
 +
 +        viewsDiff.entriesDiffering()
 +                 .values()
 +                 .forEach(diff -> 
metadataRefs.get(diff.rightValue().metadata.id).set(diff.rightValue().metadata));
 +
 +        indexesDiff.entriesDiffering()
 +                   .values()
 +                   .stream()
 +                   .map(MapDifference.ValueDifference::rightValue)
 +                   .forEach(indexTable -> 
indexMetadataRefs.get(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())).set(indexTable));
 +    }
 +
 +    public void registerListener(SchemaChangeListener listener)
 +    {
 +        changeListeners.add(listener);
 +    }
 +
 +    @SuppressWarnings("unused")
 +    public void unregisterListener(SchemaChangeListener listener)
 +    {
 +        changeListeners.remove(listener);
 +    }
 +
 +    /**
 +     * Get keyspace instance by name
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return Keyspace object or null if keyspace was not found
 +     */
 +    public Keyspace getKeyspaceInstance(String keyspaceName)
 +    {
 +        return keyspaceInstances.get(keyspaceName);
 +    }
 +
 +    public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id)
 +    {
 +        TableMetadata metadata = getTableMetadata(id);
 +        if (metadata == null)
 +            return null;
 +
 +        Keyspace instance = getKeyspaceInstance(metadata.keyspace);
 +        if (instance == null)
 +            return null;
 +
 +        return instance.hasColumnFamilyStore(metadata.id)
 +             ? instance.getColumnFamilyStore(metadata.id)
 +             : null;
 +    }
 +
 +    /**
 +     * Store given Keyspace instance to the schema
 +     *
 +     * @param keyspace The Keyspace instance to store
 +     *
 +     * @throws IllegalArgumentException if Keyspace is already stored
 +     */
 +    public void storeKeyspaceInstance(Keyspace keyspace)
 +    {
 +        if (keyspaceInstances.containsKey(keyspace.getName()))
 +            throw new IllegalArgumentException(String.format("Keyspace %s was 
already initialized.", keyspace.getName()));
 +
 +        keyspaceInstances.put(keyspace.getName(), keyspace);
 +    }
 +
 +    /**
 +     * Remove keyspace from schema
 +     *
 +     * @param keyspaceName The name of the keyspace to remove
 +     *
 +     * @return removed keyspace instance or null if it wasn't found
 +     */
 +    public Keyspace removeKeyspaceInstance(String keyspaceName)
 +    {
 +        return keyspaceInstances.remove(keyspaceName);
 +    }
 +
 +    /**
 +     * Remove keyspace definition from system
 +     *
 +     * @param ksm The keyspace definition to remove
 +     */
 +    synchronized void unload(KeyspaceMetadata ksm)
 +    {
 +        keyspaces = keyspaces.without(ksm.name);
 +
 +        ksm.tablesAndViews()
 +           .forEach(t -> metadataRefs.remove(t.id));
 +
 +        ksm.tables
 +           .indexTables()
 +           .keySet()
 +           .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, 
name)));
 +    }
 +
 +    public int getNumberOfTables()
 +    {
 +        return keyspaces.stream().mapToInt(k -> 
size(k.tablesAndViews())).sum();
 +    }
 +
 +    public ViewMetadata getView(String keyspaceName, String viewName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
 +        return (ksm == null) ? null : ksm.views.getNullable(viewName);
 +    }
 +
 +    /**
 +     * Get metadata about keyspace by its name
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return The keyspace metadata or null if it wasn't found
 +     */
 +    public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName)
 +    {
 +        assert keyspaceName != null;
 +        return keyspaces.getNullable(keyspaceName);
 +    }
 +
 +    private Set<String> getNonSystemKeyspacesSet()
 +    {
-         return Sets.difference(keyspaces.names(), 
SchemaConstants.SYSTEM_KEYSPACE_NAMES);
++        return Sets.difference(keyspaces.names(), 
SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES);
 +    }
 +
 +    /**
 +     * @return collection of the non-system keyspaces (note that this count 
as system only the
 +     * non replicated keyspaces, so keyspace like system_traces which are 
replicated are actually
 +     * returned. See getUserKeyspace() below if you don't want those)
 +     */
 +    public ImmutableList<String> getNonSystemKeyspaces()
 +    {
 +        return ImmutableList.copyOf(getNonSystemKeyspacesSet());
 +    }
 +
 +    /**
 +     * @return a collection of keyspaces that do not use LocalStrategy for 
replication
 +     */
 +    public List<String> getNonLocalStrategyKeyspaces()
 +    {
 +        return keyspaces.stream()
 +                        .filter(keyspace -> keyspace.params.replication.klass 
!= LocalStrategy.class)
 +                        .map(keyspace -> keyspace.name)
 +                        .collect(Collectors.toList());
 +    }
 +
 +    /**
 +     * @return collection of the user defined keyspaces
 +     */
 +    public List<String> getUserKeyspaces()
 +    {
 +        return 
ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), 
SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES));
 +    }
 +
 +    /**
 +     * Get metadata about keyspace inner ColumnFamilies
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return metadata about ColumnFamilies the belong to the given keyspace
 +     */
 +    public Iterable<TableMetadata> getTablesAndViews(String keyspaceName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
 +        assert ksm != null;
 +        return ksm.tablesAndViews();
 +    }
 +
 +    /**
 +     * @return collection of the all keyspace names registered in the system 
(system and non-system)
 +     */
 +    public Set<String> getKeyspaces()
 +    {
 +        return keyspaces.names();
 +    }
 +
 +    /* TableMetadata/Ref query/control methods */
 +
 +    /**
 +     * Given a keyspace name and table/view name, get the table metadata
 +     * reference. If the keyspace name or table/view name is not present
 +     * this method returns null.
 +     *
 +     * @return TableMetadataRef object or null if it wasn't found
 +     */
 +    public TableMetadataRef getTableMetadataRef(String keyspace, String table)
 +    {
 +        TableMetadata tm = getTableMetadata(keyspace, table);
 +        return tm == null
 +             ? null
 +             : metadataRefs.get(tm.id);
 +    }
 +
 +    public TableMetadataRef getIndexTableMetadataRef(String keyspace, String 
index)
 +    {
 +        return indexMetadataRefs.get(Pair.create(keyspace, index));
 +    }
 +
 +    /**
 +     * Get Table metadata by its identifier
 +     *
 +     * @param id table or view identifier
 +     *
 +     * @return metadata about Table or View
 +     */
 +    public TableMetadataRef getTableMetadataRef(TableId id)
 +    {
 +        return metadataRefs.get(id);
 +    }
 +
 +    public TableMetadataRef getTableMetadataRef(Descriptor descriptor)
 +    {
 +        return getTableMetadataRef(descriptor.ksname, descriptor.cfname);
 +    }
 +
 +    /**
 +     * Given a keyspace name and table name, get the table
 +     * meta data. If the keyspace name or table name is not valid
 +     * this function returns null.
 +     *
 +     * @param keyspace The keyspace name
 +     * @param table The table name
 +     *
 +     * @return TableMetadata object or null if it wasn't found
 +     */
 +    public TableMetadata getTableMetadata(String keyspace, String table)
 +    {
 +        assert keyspace != null;
 +        assert table != null;
 +
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspace);
 +        return ksm == null
 +             ? null
 +             : ksm.getTableOrViewNullable(table);
 +    }
 +
 +    public TableMetadata getTableMetadata(TableId id)
 +    {
 +        return keyspaces.getTableOrViewNullable(id);
 +    }
 +
 +    public TableMetadata validateTable(String keyspaceName, String tableName)
 +    {
 +        if (tableName.isEmpty())
 +            throw new InvalidRequestException("non-empty table is required");
 +
 +        KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
 +        if (keyspace == null)
 +            throw new KeyspaceNotDefinedException(format("keyspace %s does 
not exist", keyspaceName));
 +
 +        TableMetadata metadata = keyspace.getTableOrViewNullable(tableName);
 +        if (metadata == null)
 +            throw new InvalidRequestException(format("table %s does not 
exist", tableName));
 +
 +        return metadata;
 +    }
 +
 +    public TableMetadata getTableMetadata(Descriptor descriptor)
 +    {
 +        return getTableMetadata(descriptor.ksname, descriptor.cfname);
 +    }
 +
 +    /**
 +     * @throws UnknownTableException if the table couldn't be found in the 
metadata
 +     */
 +    public TableMetadata getExistingTableMetadata(TableId id) throws 
UnknownTableException
 +    {
 +        TableMetadata metadata = getTableMetadata(id);
 +        if (metadata != null)
 +            return metadata;
 +
 +        String message =
 +            String.format("Couldn't find table with id %s. If a table was 
just created, this is likely due to the schema"
 +                          + "not being fully propagated.  Please wait for 
schema agreement on table creation.",
 +                          id);
 +        throw new UnknownTableException(message, id);
 +    }
 +
 +    /* Function helpers */
 +
 +    /**
 +     * Get all function overloads with the specified name
 +     *
 +     * @param name fully qualified function name
 +     * @return an empty list if the keyspace or the function name are not 
found;
 +     *         a non-empty collection of {@link Function} otherwise
 +     */
 +    public Collection<Function> getFunctions(FunctionName name)
 +    {
 +        if (!name.hasKeyspace())
 +            throw new IllegalArgumentException(String.format("Function name 
must be fully qualified: got %s", name));
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
 +        return ksm == null
 +             ? Collections.emptyList()
 +             : ksm.functions.get(name);
 +    }
 +
 +    /**
 +     * Find the function with the specified name
 +     *
 +     * @param name fully qualified function name
 +     * @param argTypes function argument types
 +     * @return an empty {@link Optional} if the keyspace or the function name 
are not found;
 +     *         a non-empty optional of {@link Function} otherwise
 +     */
 +    public Optional<Function> findFunction(FunctionName name, 
List<AbstractType<?>> argTypes)
 +    {
 +        if (!name.hasKeyspace())
 +            throw new IllegalArgumentException(String.format("Function name 
must be fully quallified: got %s", name));
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
 +        return ksm == null
 +             ? Optional.empty()
 +             : ksm.functions.find(name, argTypes);
 +    }
 +
 +    /* Version control */
 +
 +    /**
 +     * @return current schema version
 +     */
 +    public UUID getVersion()
 +    {
 +        return version;
 +    }
 +
 +    /**
 +     * Read schema from system keyspace and calculate MD5 digest of every 
row, resulting digest
 +     * will be converted into UUID which would act as content-based version 
of the schema.
 +     */
 +    public void updateVersion()
 +    {
 +        version = SchemaKeyspace.calculateSchemaDigest();
 +        SystemKeyspace.updateSchemaVersion(version);
 +    }
 +
 +    /*
 +     * Like updateVersion, but also announces via gossip
 +     */
 +    public void updateVersionAndAnnounce()
 +    {
 +        updateVersion();
 +        MigrationManager.passiveAnnounce(version);
 +    }
 +
 +    /**
 +     * Clear all KS/CF metadata and reset version.
 +     */
 +    public synchronized void clear()
 +    {
 +        getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k)));
 +        updateVersionAndAnnounce();
 +    }
 +
 +    /*
 +     * Reload schema from local disk. Useful if a user made changes to schema 
tables by hand, or has suspicion that
 +     * in-memory representation got out of sync somehow with what's on disk.
 +     */
 +    public synchronized void reloadSchemaAndAnnounceVersion()
 +    {
-         Keyspaces before = keyspaces.filter(k -> 
!SchemaConstants.isSystemKeyspace(k.name));
++        Keyspaces before = keyspaces.filter(k -> 
!SchemaConstants.isLocalSystemKeyspace(k.name));
 +        Keyspaces after = SchemaKeyspace.fetchNonSystemKeyspaces();
 +        merge(before, after);
 +        updateVersionAndAnnounce();
 +    }
 +
 +    /**
 +     * Merge remote schema in form of mutations with local and mutate ks/cf 
metadata objects
 +     * (which also involves fs operations on add/drop ks/cf)
 +     *
 +     * @param mutations the schema changes to apply
 +     *
 +     * @throws ConfigurationException If one of metadata attributes has 
invalid value
 +     */
 +    synchronized void mergeAndAnnounceVersion(Collection<Mutation> mutations)
 +    {
 +        merge(mutations);
 +        updateVersionAndAnnounce();
 +    }
 +
 +    synchronized void merge(Collection<Mutation> mutations)
 +    {
 +        // only compare the keyspaces affected by this set of schema mutations
 +        Set<String> affectedKeyspaces = 
SchemaKeyspace.affectedKeyspaces(mutations);
 +
 +        // fetch the current state of schema for the affected keyspaces only
 +        Keyspaces before = keyspaces.filter(k -> 
affectedKeyspaces.contains(k.name));
 +
 +        // apply the schema mutations
 +        SchemaKeyspace.applyChanges(mutations);
 +
 +        // apply the schema mutations and fetch the new versions of the 
altered keyspaces
 +        Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
 +
 +        merge(before, after);
 +    }
 +
 +    private synchronized void merge(Keyspaces before, Keyspaces after)
 +    {
 +        MapDifference<String, KeyspaceMetadata> keyspacesDiff = 
before.diff(after);
 +
 +        // dropped keyspaces
 +        
keyspacesDiff.entriesOnlyOnLeft().values().forEach(this::dropKeyspace);
 +
 +        // new keyspaces
 +        
keyspacesDiff.entriesOnlyOnRight().values().forEach(this::createKeyspace);
 +
 +        // updated keyspaces
 +        keyspacesDiff.entriesDiffering().entrySet().forEach(diff -> 
alterKeyspace(diff.getValue().leftValue(), diff.getValue().rightValue()));
 +    }
 +
 +    private void alterKeyspace(KeyspaceMetadata before, KeyspaceMetadata 
after)
 +    {
 +        // calculate the deltas
 +        MapDifference<TableId, TableMetadata> tablesDiff = 
before.tables.diff(after.tables);
 +        MapDifference<TableId, ViewMetadata> viewsDiff = 
before.views.diff(after.views);
 +        MapDifference<ByteBuffer, UserType> typesDiff = 
before.types.diff(after.types);
 +        MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff 
= before.functions.udfsDiff(after.functions);
 +        MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff 
= before.functions.udasDiff(after.functions);
 +
 +        // drop tables and views
 +        viewsDiff.entriesOnlyOnLeft().values().forEach(this::dropView);
 +        tablesDiff.entriesOnlyOnLeft().values().forEach(this::dropTable);
 +
 +        load(after);
 +
 +        // add tables and views
 +        tablesDiff.entriesOnlyOnRight().values().forEach(this::createTable);
 +        viewsDiff.entriesOnlyOnRight().values().forEach(this::createView);
 +
 +        // update tables and views
 +        tablesDiff.entriesDiffering().values().forEach(diff -> 
alterTable(diff.rightValue()));
 +        viewsDiff.entriesDiffering().values().forEach(diff -> 
alterView(diff.rightValue()));
 +
 +        // deal with all removed, added, and altered views
 +        Keyspace.open(before.name).viewManager.reload();
 +
 +        // notify on everything dropped
 +        
udasDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropAggregate);
 +        
udfsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropFunction);
 +        viewsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropView);
 +        
tablesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropTable);
 +        typesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropType);
 +
 +        // notify on everything created
 +        
typesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateType);
 +        
tablesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateTable);
 +        
viewsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateView);
 +        
udfsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateFunction);
 +        
udasDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateAggregate);
 +
 +        // notify on everything altered
 +        if (!before.params.equals(after.params))
 +            notifyAlterKeyspace(after);
 +        typesDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterType(diff.rightValue()));
 +        tablesDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterTable(diff.leftValue(), diff.rightValue()));
 +        viewsDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterView(diff.leftValue(), diff.rightValue()));
 +        udfsDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterFunction(diff.rightValue()));
 +        udasDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterAggregate(diff.rightValue()));
 +    }
 +
 +    private void createKeyspace(KeyspaceMetadata keyspace)
 +    {
 +        load(keyspace);
 +        Keyspace.open(keyspace.name);
 +
 +        notifyCreateKeyspace(keyspace);
 +        keyspace.types.forEach(this::notifyCreateType);
 +        keyspace.tables.forEach(this::notifyCreateTable);
 +        keyspace.views.forEach(this::notifyCreateView);
 +        keyspace.functions.udfs().forEach(this::notifyCreateFunction);
 +        keyspace.functions.udas().forEach(this::notifyCreateAggregate);
 +    }
 +
 +    private void dropKeyspace(KeyspaceMetadata keyspace)
 +    {
 +        keyspace.views.forEach(this::dropView);
 +        keyspace.tables.forEach(this::dropTable);
 +
 +        // remove the keyspace from the static instances.
 +        Keyspace.clear(keyspace.name);
 +        unload(keyspace);
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        keyspace.functions.udas().forEach(this::notifyDropAggregate);
 +        keyspace.functions.udfs().forEach(this::notifyDropFunction);
 +        keyspace.views.forEach(this::notifyDropView);
 +        keyspace.tables.forEach(this::notifyDropTable);
 +        keyspace.types.forEach(this::notifyDropType);
 +        notifyDropKeyspace(keyspace);
 +    }
 +
 +    private void dropView(ViewMetadata metadata)
 +    {
 +        dropTable(metadata.metadata);
 +    }
 +
 +    private void dropTable(TableMetadata metadata)
 +    {
 +        ColumnFamilyStore cfs = 
Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
 +        assert cfs != null;
 +        // make sure all the indexes are dropped, or else.
 +        cfs.indexManager.markAllIndexesRemoved();
 +        
CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata),
 true);
 +        if (DatabaseDescriptor.isAutoSnapshot())
 +            
cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, 
ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
 +        
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
 +        Keyspace.open(metadata.keyspace).dropCf(metadata.id);
 +    }
 +
 +    private void createTable(TableMetadata table)
 +    {
 +        Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), 
true);
 +    }
 +
 +    private void createView(ViewMetadata view)
 +    {
 +        
Keyspace.open(view.keyspace).initCf(metadataRefs.get(view.metadata.id), true);
 +    }
 +
 +    private void alterTable(TableMetadata updated)
 +    {
 +        
Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
 +    }
 +
 +    private void alterView(ViewMetadata updated)
 +    {
 +        
Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
 +    }
 +
 +    private void notifyCreateKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onCreateKeyspace(ksm.name));
 +    }
 +
 +    private void notifyCreateTable(TableMetadata metadata)
 +    {
 +        changeListeners.forEach(l -> l.onCreateTable(metadata.keyspace, 
metadata.name));
 +    }
 +
 +    private void notifyCreateView(ViewMetadata view)
 +    {
 +        changeListeners.forEach(l -> l.onCreateView(view.keyspace, 
view.name));
 +    }
 +
 +    private void notifyCreateType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onCreateType(ut.keyspace, 
ut.getNameAsString()));
 +    }
 +
 +    private void notifyCreateFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onCreateFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyCreateAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyAlterKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onAlterKeyspace(ksm.name));
 +    }
 +
 +    private void notifyAlterTable(TableMetadata current, TableMetadata 
updated)
 +    {
 +        boolean changeAffectedPreparedStatements = 
current.changeAffectsPreparedStatements(updated);
 +        changeListeners.forEach(l -> l.onAlterTable(updated.keyspace, 
updated.name, changeAffectedPreparedStatements));
 +    }
 +
 +    private void notifyAlterView(ViewMetadata current, ViewMetadata updated)
 +    {
 +        boolean changeAffectedPreparedStatements = 
current.metadata.changeAffectsPreparedStatements(updated.metadata);
 +        changeListeners.forEach(l ->l.onAlterView(updated.keyspace, 
updated.name, changeAffectedPreparedStatements));
 +    }
 +
 +    private void notifyAlterType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onAlterType(ut.keyspace, 
ut.getNameAsString()));
 +    }
 +
 +    private void notifyAlterFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onAlterFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyAlterAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onAlterAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyDropKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onDropKeyspace(ksm.name));
 +    }
 +
 +    private void notifyDropTable(TableMetadata metadata)
 +    {
 +        changeListeners.forEach(l -> l.onDropTable(metadata.keyspace, 
metadata.name));
 +    }
 +
 +    private void notifyDropView(ViewMetadata view)
 +    {
 +        changeListeners.forEach(l -> l.onDropView(view.keyspace, view.name));
 +    }
 +
 +    private void notifyDropType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onDropType(ut.keyspace, 
ut.getNameAsString()));
 +    }
 +
 +    private void notifyDropFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onDropFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyDropAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/schema/SchemaConstants.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/SchemaConstants.java
index 818c371,0000000..d92f09c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@@ -1,83 -1,0 +1,91 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.schema;
 +
 +import java.security.MessageDigest;
 +import java.security.NoSuchAlgorithmException;
 +import java.util.*;
 +import java.util.regex.Pattern;
 +
 +import com.google.common.collect.ImmutableSet;
 +
 +public final class SchemaConstants
 +{
 +    public static final Pattern PATTERN_WORD_CHARS = Pattern.compile("\\w+");
 +
 +    public static final String SYSTEM_KEYSPACE_NAME = "system";
 +    public static final String SCHEMA_KEYSPACE_NAME = "system_schema";
 +
 +    public static final String TRACE_KEYSPACE_NAME = "system_traces";
 +    public static final String AUTH_KEYSPACE_NAME = "system_auth";
 +    public static final String DISTRIBUTED_KEYSPACE_NAME = 
"system_distributed";
 +
 +    /* system keyspace names (the ones with LocalStrategy replication 
strategy) */
-     public static final Set<String> SYSTEM_KEYSPACE_NAMES = 
ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
++    public static final Set<String> LOCAL_SYSTEM_KEYSPACE_NAMES =
++        ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
 +
 +    /* replicate system keyspace names (the ones with a "true" replication 
strategy) */
-     public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = 
ImmutableSet.of(TRACE_KEYSPACE_NAME,
-                                                                               
         AUTH_KEYSPACE_NAME,
-                                                                               
         DISTRIBUTED_KEYSPACE_NAME);
++    public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES =
++        ImmutableSet.of(TRACE_KEYSPACE_NAME, AUTH_KEYSPACE_NAME, 
DISTRIBUTED_KEYSPACE_NAME);
 +    /**
 +     * longest permissible KS or CF name.  Our main concern is that filename 
not be more than 255 characters;
 +     * the filename will contain both the KS and CF names. Since 
non-schema-name components only take up
 +     * ~64 characters, we could allow longer names than this, but on Windows, 
the entire path should be not greater than
 +     * 255 characters, so a lower limit here helps avoid problems.  See 
CASSANDRA-4110.
 +     */
 +    public static final int NAME_LENGTH = 48;
 +
 +    // 59adb24e-f3cd-3e02-97f0-5b395827453f
 +    public static final UUID emptyVersion;
 +
 +    public static final List<String> LEGACY_AUTH_TABLES = 
Arrays.asList("credentials", "users", "permissions");
 +
 +    public static boolean isValidName(String name)
 +    {
 +        return name != null && !name.isEmpty() && name.length() <= 
NAME_LENGTH && PATTERN_WORD_CHARS.matcher(name).matches();
 +    }
 +
 +    static
 +    {
 +        try
 +        {
 +            emptyVersion = 
UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest());
 +        }
 +        catch (NoSuchAlgorithmException e)
 +        {
 +            throw new AssertionError();
 +        }
 +    }
 +
 +    /**
 +     * @return whether or not the keyspace is a really system one (w/ 
LocalStrategy, unmodifiable, hardcoded)
 +     */
-     public static boolean isSystemKeyspace(String keyspaceName)
++    public static boolean isLocalSystemKeyspace(String keyspaceName)
 +    {
-         return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
++        return 
LOCAL_SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
++    }
++
++    /**
++     * @return whether or not the keyspace is a replicated system ks 
(system_auth, system_traces, system_distributed)
++     */
++    public static boolean isReplicatedSystemKeyspace(String keyspaceName)
++    {
++        return 
REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index b7de321,7834b12..91583bc
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@@ -881,9 -905,9 +881,9 @@@ public final class SchemaKeyspac
       * Fetching schema
       */
  
 -    public static Keyspaces fetchNonSystemKeyspaces()
 +    static Keyspaces fetchNonSystemKeyspaces()
      {
-         return fetchKeyspacesWithout(SchemaConstants.SYSTEM_KEYSPACE_NAMES);
+         return 
fetchKeyspacesWithout(SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES);
      }
  
      private static Keyspaces fetchKeyspacesWithout(Set<String> 
excludedKeyspaceNames)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/schema/TableId.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/TableId.java
index 4b2592e,0000000..95256fe
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@@ -1,118 -1,0 +1,117 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.UUID;
 +
 +import org.apache.commons.lang3.ArrayUtils;
 +
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +/**
 + * The unique identifier of a table.
 + * <p>
 + * This is essentially a UUID, but we wrap it as it's used quite a bit in the 
code and having a nicely name class make
 + * the code more readable.
 + */
 +public class TableId
 +{
 +    private final UUID id;
 +
 +    private TableId(UUID id)
 +    {
 +        this.id = id;
 +    }
 +
 +    public static TableId fromUUID(UUID id)
 +    {
 +        return new TableId(id);
 +    }
 +
 +    public static TableId generate()
 +    {
 +        return new TableId(UUIDGen.getTimeUUID());
 +    }
 +
 +    public static TableId fromString(String idString)
 +    {
 +        return new TableId(UUID.fromString(idString));
 +    }
 +
 +    /**
 +     * Creates the UUID of a system table.
 +     *
 +     * This is deterministically based on the table name as system tables are 
hardcoded and initialized independently
 +     * on each node (they don't go through a CREATE), but we still want them 
to have the same ID everywhere.
 +     *
 +     * We shouldn't use this for any other table.
 +     */
 +    public static TableId forSystemTable(String keyspace, String table)
 +    {
-         assert SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(keyspace)
-                || 
SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(keyspace);
++        assert SchemaConstants.isLocalSystemKeyspace(keyspace) || 
SchemaConstants.isReplicatedSystemKeyspace(keyspace);
 +        return new 
TableId(UUID.nameUUIDFromBytes(ArrayUtils.addAll(keyspace.getBytes(), 
table.getBytes())));
 +    }
 +
 +    public String toHexString()
 +    {
 +        return ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(id));
 +    }
 +
 +    public UUID asUUID()
 +    {
 +        return id;
 +    }
 +
 +    @Override
 +    public final int hashCode()
 +    {
 +        return id.hashCode();
 +    }
 +
 +    @Override
 +    public final boolean equals(Object o)
 +    {
 +        return this == o || (o instanceof TableId && 
this.id.equals(((TableId) o).id));
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return id.toString();
 +    }
 +
 +    public void serialize(DataOutput out) throws IOException
 +    {
 +        out.writeLong(id.getMostSignificantBits());
 +        out.writeLong(id.getLeastSignificantBits());
 +    }
 +
 +    public int serializedSize()
 +    {
 +        return 16;
 +    }
 +
 +    public static TableId deserialize(DataInput in) throws IOException
 +    {
 +        return new TableId(new UUID(in.readLong(), in.readLong()));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/TableMetadata.java
index d50a9bb,0000000..cf4d6bd
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@@ -1,969 -1,0 +1,969 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.Objects;
 +
 +import com.google.common.base.MoreObjects;
 +import com.google.common.collect.*;
 +
 +import org.apache.cassandra.auth.DataResource;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.utils.AbstractIterator;
 +import org.github.jamm.Unmetered;
 +
 +import static java.lang.String.format;
 +import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toSet;
 +
 +import static com.google.common.collect.Iterables.transform;
 +import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
 +
 +@Unmetered
 +public final class TableMetadata
 +{
 +    public enum Flag
 +    {
 +        SUPER, COUNTER, DENSE, COMPOUND;
 +
 +        public static Set<Flag> fromStringSet(Set<String> strings)
 +        {
 +            return 
strings.stream().map(String::toUpperCase).map(Flag::valueOf).collect(toSet());
 +        }
 +
 +        public static Set<String> toStringSet(Set<Flag> flags)
 +        {
 +            return 
flags.stream().map(Flag::toString).map(String::toLowerCase).collect(toSet());
 +        }
 +    }
 +
 +    public final String keyspace;
 +    public final String name;
 +    public final TableId id;
 +
 +    public final IPartitioner partitioner;
 +    public final TableParams params;
 +    public final ImmutableSet<Flag> flags;
 +
 +    private final boolean isView;
 +    private final String indexName; // derived from table name
 +
 +    /*
 +     * All CQL3 columns definition are stored in the columns map.
 +     * On top of that, we keep separated collection of each kind of 
definition, to
 +     * 1) allow easy access to each kind and
 +     * 2) for the partition key and clustering key ones, those list are 
ordered by the "component index" of the elements.
 +     */
 +    public final ImmutableMap<ByteBuffer, DroppedColumn> droppedColumns;
 +    final ImmutableMap<ByteBuffer, ColumnMetadata> columns;
 +
 +    private final ImmutableList<ColumnMetadata> partitionKeyColumns;
 +    private final ImmutableList<ColumnMetadata> clusteringColumns;
 +    private final RegularAndStaticColumns regularAndStaticColumns;
 +
 +    public final Indexes indexes;
 +    public final Triggers triggers;
 +
 +    // derived automatically from flags and columns
 +    public final AbstractType<?> partitionKeyType;
 +    public final ClusteringComparator comparator;
 +
 +    /*
 +     * For dense tables, this alias the single non-PK column the table 
contains (since it can only have one). We keep
 +     * that as convenience to access that column more easily (but we could 
replace calls by regularAndStaticColumns().iterator().next()
 +     * for those tables in practice).
 +     */
 +    public final ColumnMetadata compactValueColumn;
 +
 +    // performance hacks; TODO see if all are really necessary
 +    public final DataResource resource;
 +
 +    private TableMetadata(Builder builder)
 +    {
 +        keyspace = builder.keyspace;
 +        name = builder.name;
 +        id = builder.id;
 +
 +        partitioner = builder.partitioner;
 +        params = builder.params.build();
 +        flags = Sets.immutableEnumSet(builder.flags);
 +        isView = builder.isView;
 +
 +        indexName = name.contains(".")
 +                  ? name.substring(name.indexOf('.') + 1)
 +                  : null;
 +
 +        droppedColumns = ImmutableMap.copyOf(builder.droppedColumns);
 +        Collections.sort(builder.partitionKeyColumns);
 +        partitionKeyColumns = 
ImmutableList.copyOf(builder.partitionKeyColumns);
 +        Collections.sort(builder.clusteringColumns);
 +        clusteringColumns = ImmutableList.copyOf(builder.clusteringColumns);
 +        regularAndStaticColumns = 
RegularAndStaticColumns.builder().addAll(builder.regularAndStaticColumns).build();
 +        columns = ImmutableMap.copyOf(builder.columns);
 +
 +        indexes = builder.indexes;
 +        triggers = builder.triggers;
 +
 +        partitionKeyType = partitionKeyColumns.size() == 1
 +                         ? partitionKeyColumns.get(0).type
 +                         : 
CompositeType.getInstance(transform(partitionKeyColumns, t -> t.type));
 +
 +        comparator = new ClusteringComparator(transform(clusteringColumns, c 
-> c.type));
 +
 +        compactValueColumn = isCompactTable()
 +                           ? 
CompactTables.getCompactValueColumn(regularAndStaticColumns, isSuper())
 +                           : null;
 +
 +        resource = DataResource.table(keyspace, name);
 +    }
 +
 +    public static Builder builder(String keyspace, String table)
 +    {
 +        return new Builder(keyspace, table);
 +    }
 +
 +    public static Builder builder(String keyspace, String table, TableId id)
 +    {
 +        return new Builder(keyspace, table, id);
 +    }
 +
 +    public Builder unbuild()
 +    {
 +        return builder(keyspace, name, id)
 +               .partitioner(partitioner)
 +               .params(params)
 +               .flags(flags)
 +               .isView(isView)
 +               .addColumns(columns())
 +               .droppedColumns(droppedColumns)
 +               .indexes(indexes)
 +               .triggers(triggers);
 +    }
 +
 +    public boolean isView()
 +    {
 +        return isView;
 +    }
 +
 +    public boolean isIndex()
 +    {
 +        return indexName != null;
 +    }
 +
 +    public Optional<String> indexName()
 +    {
 +        return Optional.ofNullable(indexName);
 +    }
 +
 +    /*
 +     *  We call dense a CF for which each component of the comparator is a 
clustering column, i.e. no
 +     * component is used to store a regular column names. In other words, 
non-composite static "thrift"
 +     * and CQL3 CF are *not* dense.
 +     */
 +    public boolean isDense()
 +    {
 +        return flags.contains(Flag.DENSE);
 +    }
 +
 +    public boolean isCompound()
 +    {
 +        return flags.contains(Flag.COMPOUND);
 +    }
 +
 +    public boolean isSuper()
 +    {
 +        return flags.contains(Flag.SUPER);
 +    }
 +
 +    public boolean isCounter()
 +    {
 +        return flags.contains(Flag.COUNTER);
 +    }
 +
 +    public boolean isCQLTable()
 +    {
 +        return !isSuper() && !isDense() && isCompound();
 +    }
 +
 +    public boolean isCompactTable()
 +    {
 +        return !isCQLTable();
 +    }
 +
 +    public boolean isStaticCompactTable()
 +    {
 +        return !isSuper() && !isDense() && !isCompound();
 +    }
 +
 +    public ImmutableCollection<ColumnMetadata> columns()
 +    {
 +        return columns.values();
 +    }
 +
 +    public Iterable<ColumnMetadata> primaryKeyColumns()
 +    {
 +        return Iterables.concat(partitionKeyColumns, clusteringColumns);
 +    }
 +
 +    public ImmutableList<ColumnMetadata> partitionKeyColumns()
 +    {
 +        return partitionKeyColumns;
 +    }
 +
 +    public ImmutableList<ColumnMetadata> clusteringColumns()
 +    {
 +        return clusteringColumns;
 +    }
 +
 +    public RegularAndStaticColumns regularAndStaticColumns()
 +    {
 +        return regularAndStaticColumns;
 +    }
 +
 +    public Columns regularColumns()
 +    {
 +        return regularAndStaticColumns.regulars;
 +    }
 +
 +    public Columns staticColumns()
 +    {
 +        return regularAndStaticColumns.statics;
 +    }
 +
 +    /*
 +     * An iterator over all column definitions but that respect the order of 
a SELECT *.
 +     * This also "hide" the clustering/regular columns for a non-CQL3 
non-dense table for backward compatibility
 +     * sake.
 +     */
 +    public Iterator<ColumnMetadata> allColumnsInSelectOrder()
 +    {
 +        final boolean isStaticCompactTable = isStaticCompactTable();
 +        final boolean noNonPkColumns = isCompactTable() && 
CompactTables.hasEmptyCompactValue(this);
 +
 +        return new AbstractIterator<ColumnMetadata>()
 +        {
 +            private final Iterator<ColumnMetadata> partitionKeyIter = 
partitionKeyColumns.iterator();
 +            private final Iterator<ColumnMetadata> clusteringIter =
 +                isStaticCompactTable ? Collections.emptyIterator() : 
clusteringColumns.iterator();
 +            private final Iterator<ColumnMetadata> otherColumns =
 +                noNonPkColumns
 +              ? Collections.emptyIterator()
 +              : (isStaticCompactTable ? staticColumns().selectOrderIterator()
 +                                      : 
regularAndStaticColumns.selectOrderIterator());
 +
 +            protected ColumnMetadata computeNext()
 +            {
 +                if (partitionKeyIter.hasNext())
 +                    return partitionKeyIter.next();
 +
 +                if (clusteringIter.hasNext())
 +                    return clusteringIter.next();
 +
 +                return otherColumns.hasNext() ? otherColumns.next() : 
endOfData();
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Returns the ColumnMetadata for {@code name}.
 +     */
 +    public ColumnMetadata getColumn(ColumnIdentifier name)
 +    {
 +        return columns.get(name.bytes);
 +    }
 +
 +    /*
 +     * In general it is preferable to work with ColumnIdentifier to make it
 +     * clear that we are talking about a CQL column, not a cell name, but 
there
 +     * is a few cases where all we have is a ByteBuffer (when dealing with 
IndexExpression
 +     * for instance) so...
 +     */
 +    public ColumnMetadata getColumn(ByteBuffer name)
 +    {
 +        return columns.get(name);
 +    }
 +
 +    public ColumnMetadata getDroppedColumn(ByteBuffer name)
 +    {
 +        DroppedColumn dropped = droppedColumns.get(name);
 +        return dropped == null ? null : dropped.column;
 +    }
 +
 +    /**
 +     * Returns a "fake" ColumnMetadata corresponding to the dropped column 
{@code name}
 +     * of {@code null} if there is no such dropped column.
 +     *
 +     * @param name - the column name
 +     * @param isStatic - whether the column was a static column, if known
 +     */
 +    public ColumnMetadata getDroppedColumn(ByteBuffer name, boolean isStatic)
 +    {
 +        DroppedColumn dropped = droppedColumns.get(name);
 +        if (dropped == null)
 +            return null;
 +
 +        if (isStatic && !dropped.column.isStatic())
 +            return ColumnMetadata.staticColumn(this, name, 
dropped.column.type);
 +
 +        return dropped.column;
 +    }
 +
 +    public boolean hasStaticColumns()
 +    {
 +        return !staticColumns().isEmpty();
 +    }
 +
 +    public void validate()
 +    {
 +        if (!isNameValid(keyspace))
 +            except("Keyspace name must not be empty, more than %s characters 
long, or contain non-alphanumeric-underscore characters (got \"%s\")", 
SchemaConstants.NAME_LENGTH, keyspace);
 +
 +        if (!isNameValid(name))
 +            except("Table name must not be empty, more than %s characters 
long, or contain non-alphanumeric-underscore characters (got \"%s\")", 
SchemaConstants.NAME_LENGTH, name);
 +
 +        params.validate();
 +
 +        if (partitionKeyColumns.stream().anyMatch(c -> c.type.isCounter()))
 +            except("PRIMARY KEY columns cannot contain counters");
 +
 +        // Mixing counter with non counter columns is not supported (#2614)
 +        if (isCounter())
 +        {
 +            for (ColumnMetadata column : regularAndStaticColumns)
 +                if (!(column.type.isCounter()) && 
!CompactTables.isSuperColumnMapColumn(column))
 +                    except("Cannot have a non counter column (\"%s\") in a 
counter table", column.name);
 +        }
 +        else
 +        {
 +            for (ColumnMetadata column : regularAndStaticColumns)
 +                if (column.type.isCounter())
 +                    except("Cannot have a counter column (\"%s\") in a non 
counter column table", column.name);
 +        }
 +
 +        // All tables should have a partition key
 +        if (partitionKeyColumns.isEmpty())
 +            except("Missing partition keys for table %s", toString());
 +
 +        // A compact table should always have a clustering
 +        if (isCompactTable() && clusteringColumns.isEmpty())
 +            except("For table %s, isDense=%b, isCompound=%b, clustering=%s", 
toString(), isDense(), isCompound(), clusteringColumns);
 +
 +        if (!indexes.isEmpty() && isSuper())
 +            except("Secondary indexes are not supported on super column 
families");
 +
 +        indexes.validate(this);
 +    }
 +
 +    void validateCompatibility(TableMetadata other)
 +    {
 +        if (isIndex())
 +            return;
 +
 +        if (!other.keyspace.equals(keyspace))
 +            except("Keyspace mismatch (found %s; expected %s)", 
other.keyspace, keyspace);
 +
 +        if (!other.name.equals(name))
 +            except("Table mismatch (found %s; expected %s)", other.name, 
name);
 +
 +        if (!other.id.equals(id))
 +            except("Table ID mismatch (found %s; expected %s)", other.id, id);
 +
 +        if (!other.flags.equals(flags))
 +            except("Table type mismatch (found %s; expected %s)", 
other.flags, flags);
 +
 +        if (other.partitionKeyColumns.size() != partitionKeyColumns.size())
 +            except("Partition keys of different length (found %s; expected 
%s)", other.partitionKeyColumns.size(), partitionKeyColumns.size());
 +
 +        for (int i = 0; i < partitionKeyColumns.size(); i++)
 +            if 
(!other.partitionKeyColumns.get(i).type.isCompatibleWith(partitionKeyColumns.get(i).type))
 +                except("Partition key column mismatch (found %s; expected 
%s)", other.partitionKeyColumns.get(i).type, partitionKeyColumns.get(i).type);
 +
 +        if (other.clusteringColumns.size() != clusteringColumns.size())
 +            except("Clustering columns of different length (found %s; 
expected %s)", other.clusteringColumns.size(), clusteringColumns.size());
 +
 +        for (int i = 0; i < clusteringColumns.size(); i++)
 +            if 
(!other.clusteringColumns.get(i).type.isCompatibleWith(clusteringColumns.get(i).type))
 +                except("Clustering column mismatch (found %s; expected %s)", 
other.clusteringColumns.get(i).type, clusteringColumns.get(i).type);
 +
 +        for (ColumnMetadata otherColumn : other.regularAndStaticColumns)
 +        {
 +            ColumnMetadata column = getColumn(otherColumn.name);
 +            if (column != null && 
!otherColumn.type.isCompatibleWith(column.type))
 +                except("Column mismatch (found %s; expected %s", otherColumn, 
column);
 +        }
 +    }
 +
 +    public ClusteringComparator partitionKeyAsClusteringComparator()
 +    {
 +        return new ClusteringComparator(partitionKeyColumns.stream().map(c -> 
c.type).collect(toList()));
 +    }
 +
 +    /**
 +     * The type to use to compare column names in "static compact"
 +     * tables or superColum ones.
 +     * <p>
 +     * This exists because for historical reasons, "static compact" tables as
 +     * well as super column ones can have non-UTF8 column names.
 +     * <p>
 +     * This method should only be called for superColumn tables and "static
 +     * compact" ones. For any other table, all column names are UTF8.
 +     */
 +    public AbstractType<?> staticCompactOrSuperTableColumnNameType()
 +    {
 +        if (isSuper())
 +        {
 +            assert compactValueColumn != null && compactValueColumn.type 
instanceof MapType;
 +            return ((MapType) compactValueColumn.type).nameComparator();
 +        }
 +
 +        assert isStaticCompactTable();
 +        return clusteringColumns.get(0).type;
 +    }
 +
 +    public AbstractType<?> columnDefinitionNameComparator(ColumnMetadata.Kind 
kind)
 +    {
 +        return (isSuper() && kind == ColumnMetadata.Kind.REGULAR) || 
(isStaticCompactTable() && kind == ColumnMetadata.Kind.STATIC)
 +             ? staticCompactOrSuperTableColumnNameType()
 +             : UTF8Type.instance;
 +    }
 +
 +    /**
 +     * Generate a table name for an index corresponding to the given column.
 +     * This is NOT the same as the index's name! This is only used in sstable 
filenames and is not exposed to users.
 +     *
 +     * @param info A definition of the column with index
 +     *
 +     * @return name of the index table
 +     */
 +    public String indexTableName(IndexMetadata info)
 +    {
 +        // TODO simplify this when info.index_name is guaranteed to be set
 +        return name + Directories.SECONDARY_INDEX_NAME_SEPARATOR + info.name;
 +    }
 +
 +    /**
 +     * @return true if the change as made impacts queries/updates on the 
table,
 +     *         e.g. any columns or indexes were added, removed, or altered; 
otherwise, false is returned.
 +     *         Used to determine whether prepared statements against this 
table need to be re-prepared.
 +     */
 +    boolean changeAffectsPreparedStatements(TableMetadata updated)
 +    {
 +        return !partitionKeyColumns.equals(updated.partitionKeyColumns)
 +            || !clusteringColumns.equals(updated.clusteringColumns)
 +            || 
!regularAndStaticColumns.equals(updated.regularAndStaticColumns)
 +            || !indexes.equals(updated.indexes)
 +            || params.defaultTimeToLive != updated.params.defaultTimeToLive
 +            || params.gcGraceSeconds != updated.params.gcGraceSeconds;
 +    }
 +
 +    /**
 +     * There is a couple of places in the code where we need a TableMetadata 
object and don't have one readily available
 +     * and know that only the keyspace and name matter. This creates such 
"fake" metadata. Use only if you know what
 +     * you're doing.
 +     */
 +    public static TableMetadata minimal(String keyspace, String name)
 +    {
 +        return TableMetadata.builder(keyspace, name)
 +                            .addPartitionKeyColumn("key", BytesType.instance)
 +                            .build();
 +    }
 +
 +    public TableMetadata updateIndexTableMetadata(TableParams baseTableParams)
 +    {
 +        TableParams.Builder builder =
 +            baseTableParams.unbuild()
 +                           .readRepairChance(0.0)
 +                           .dcLocalReadRepairChance(0.0)
 +                           .gcGraceSeconds(0);
 +
 +        // Depends on parent's cache setting, turn on its index table's cache.
 +        // Row caching is never enabled; see CASSANDRA-5732
 +        builder.caching(baseTableParams.caching.cacheKeys() ? 
CachingParams.CACHE_KEYS : CachingParams.CACHE_NOTHING);
 +
 +        return unbuild().params(builder.build()).build();
 +    }
 +
 +    private void except(String format, Object... args)
 +    {
 +        throw new ConfigurationException(keyspace + "." + name + ": " 
+format(format, args));
 +    }
 +
 +    @Override
 +    public boolean equals(Object o)
 +    {
 +        if (this == o)
 +            return true;
 +
 +        if (!(o instanceof TableMetadata))
 +            return false;
 +
 +        TableMetadata tm = (TableMetadata) o;
 +
 +        return keyspace.equals(tm.keyspace)
 +            && name.equals(tm.name)
 +            && id.equals(tm.id)
 +            && partitioner.equals(tm.partitioner)
 +            && params.equals(tm.params)
 +            && flags.equals(tm.flags)
 +            && isView == tm.isView
 +            && columns.equals(tm.columns)
 +            && droppedColumns.equals(tm.droppedColumns)
 +            && indexes.equals(tm.indexes)
 +            && triggers.equals(tm.triggers);
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        return Objects.hash(keyspace, name, id, partitioner, params, flags, 
isView, columns, droppedColumns, indexes, triggers);
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), 
ColumnIdentifier.maybeQuote(name));
 +    }
 +
 +    public String toDebugString()
 +    {
 +        return MoreObjects.toStringHelper(this)
 +                          .add("keyspace", keyspace)
 +                          .add("table", name)
 +                          .add("id", id)
 +                          .add("partitioner", partitioner)
 +                          .add("params", params)
 +                          .add("flags", flags)
 +                          .add("isView", isView)
 +                          .add("columns", columns())
 +                          .add("droppedColumns", droppedColumns.values())
 +                          .add("indexes", indexes)
 +                          .add("triggers", triggers)
 +                          .toString();
 +    }
 +
 +    public static final class Builder
 +    {
 +        final String keyspace;
 +        final String name;
 +
 +        private TableId id;
 +
 +        private IPartitioner partitioner;
 +        private TableParams.Builder params = TableParams.builder();
 +
 +        // Setting compound as default as "normal" CQL tables are compound 
and that's what we want by default
 +        private Set<Flag> flags = EnumSet.of(Flag.COMPOUND);
 +        private Triggers triggers = Triggers.none();
 +        private Indexes indexes = Indexes.none();
 +
 +        private final Map<ByteBuffer, DroppedColumn> droppedColumns = new 
HashMap<>();
 +        private final Map<ByteBuffer, ColumnMetadata> columns = new 
HashMap<>();
 +        private final List<ColumnMetadata> partitionKeyColumns = new 
ArrayList<>();
 +        private final List<ColumnMetadata> clusteringColumns = new 
ArrayList<>();
 +        private final List<ColumnMetadata> regularAndStaticColumns = new 
ArrayList<>();
 +
 +        private boolean isView;
 +
 +        private Builder(String keyspace, String name, TableId id)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +            this.id = id;
 +        }
 +
 +        private Builder(String keyspace, String name)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +        }
 +
 +        public TableMetadata build()
 +        {
 +            if (partitioner == null)
 +                partitioner = DatabaseDescriptor.getPartitioner();
 +
 +            if (id == null)
 +                id = TableId.generate();
 +
 +            return new TableMetadata(this);
 +        }
 +
 +        public Builder id(TableId val)
 +        {
 +            id = val;
 +            return this;
 +        }
 +
 +        public Builder partitioner(IPartitioner val)
 +        {
 +            partitioner = val;
 +            return this;
 +        }
 +
 +        public Builder params(TableParams val)
 +        {
 +            params = val.unbuild();
 +            return this;
 +        }
 +
 +        public Builder bloomFilterFpChance(double val)
 +        {
 +            params.bloomFilterFpChance(val);
 +            return this;
 +        }
 +
 +        public Builder caching(CachingParams val)
 +        {
 +            params.caching(val);
 +            return this;
 +        }
 +
 +        public Builder comment(String val)
 +        {
 +            params.comment(val);
 +            return this;
 +        }
 +
 +        public Builder compaction(CompactionParams val)
 +        {
 +            params.compaction(val);
 +            return this;
 +        }
 +
 +        public Builder compression(CompressionParams val)
 +        {
 +            params.compression(val);
 +            return this;
 +        }
 +
 +        public Builder dcLocalReadRepairChance(double val)
 +        {
 +            params.dcLocalReadRepairChance(val);
 +            return this;
 +        }
 +
 +        public Builder defaultTimeToLive(int val)
 +        {
 +            params.defaultTimeToLive(val);
 +            return this;
 +        }
 +
 +        public Builder gcGraceSeconds(int val)
 +        {
 +            params.gcGraceSeconds(val);
 +            return this;
 +        }
 +
 +        public Builder maxIndexInterval(int val)
 +        {
 +            params.maxIndexInterval(val);
 +            return this;
 +        }
 +
 +        public Builder memtableFlushPeriod(int val)
 +        {
 +            params.memtableFlushPeriodInMs(val);
 +            return this;
 +        }
 +
 +        public Builder minIndexInterval(int val)
 +        {
 +            params.minIndexInterval(val);
 +            return this;
 +        }
 +
 +        public Builder readRepairChance(double val)
 +        {
 +            params.readRepairChance(val);
 +            return this;
 +        }
 +
 +        public Builder crcCheckChance(double val)
 +        {
 +            params.crcCheckChance(val);
 +            return this;
 +        }
 +
 +        public Builder speculativeRetry(SpeculativeRetryParam val)
 +        {
 +            params.speculativeRetry(val);
 +            return this;
 +        }
 +
 +        public Builder extensions(Map<String, ByteBuffer> val)
 +        {
 +            params.extensions(val);
 +            return this;
 +        }
 +
 +        public Builder isView(boolean val)
 +        {
 +            isView = val;
 +            return this;
 +        }
 +
 +        public Builder flags(Set<Flag> val)
 +        {
 +            flags = val;
 +            return this;
 +        }
 +
 +        public Builder isSuper(boolean val)
 +        {
 +            return flag(Flag.SUPER, val);
 +        }
 +
 +        public Builder isCounter(boolean val)
 +        {
 +            return flag(Flag.COUNTER, val);
 +        }
 +
 +        public Builder isDense(boolean val)
 +        {
 +            return flag(Flag.DENSE, val);
 +        }
 +
 +        public Builder isCompound(boolean val)
 +        {
 +            return flag(Flag.COMPOUND, val);
 +        }
 +
 +        private Builder flag(Flag flag, boolean set)
 +        {
 +            if (set) flags.add(flag); else flags.remove(flag);
 +            return this;
 +        }
 +
 +        public Builder triggers(Triggers val)
 +        {
 +            triggers = val;
 +            return this;
 +        }
 +
 +        public Builder indexes(Indexes val)
 +        {
 +            indexes = val;
 +            return this;
 +        }
 +
 +        public Builder addPartitionKeyColumn(String name, AbstractType type)
 +        {
 +            return addPartitionKeyColumn(ColumnIdentifier.getInterned(name, 
false), type);
 +        }
 +
 +        public Builder addPartitionKeyColumn(ColumnIdentifier name, 
AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, 
type, partitionKeyColumns.size(), ColumnMetadata.Kind.PARTITION_KEY));
 +        }
 +
 +        public Builder addClusteringColumn(String name, AbstractType type)
 +        {
 +            return addClusteringColumn(ColumnIdentifier.getInterned(name, 
false), type);
 +        }
 +
 +        public Builder addClusteringColumn(ColumnIdentifier name, 
AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, 
type, clusteringColumns.size(), ColumnMetadata.Kind.CLUSTERING));
 +        }
 +
 +        public Builder addRegularColumn(String name, AbstractType type)
 +        {
 +            return addRegularColumn(ColumnIdentifier.getInterned(name, 
false), type);
 +        }
 +
 +        public Builder addRegularColumn(ColumnIdentifier name, AbstractType 
type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, 
type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.REGULAR));
 +        }
 +
 +        public Builder addStaticColumn(String name, AbstractType type)
 +        {
 +            return addStaticColumn(ColumnIdentifier.getInterned(name, false), 
type);
 +        }
 +
 +        public Builder addStaticColumn(ColumnIdentifier name, AbstractType 
type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, 
type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.STATIC));
 +        }
 +
 +        public Builder addColumn(ColumnMetadata column)
 +        {
 +            if (columns.containsKey(column.name.bytes))
 +                throw new IllegalArgumentException();
 +
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitionKeyColumns.add(column);
 +                    Collections.sort(partitionKeyColumns);
 +                    break;
 +                case CLUSTERING:
 +                    column.type.checkComparable();
 +                    clusteringColumns.add(column);
 +                    Collections.sort(clusteringColumns);
 +                    break;
 +                default:
 +                    regularAndStaticColumns.add(column);
 +            }
 +
 +            columns.put(column.name.bytes, column);
 +
 +            return this;
 +        }
 +
 +        public Builder addColumns(Iterable<ColumnMetadata> columns)
 +        {
 +            columns.forEach(this::addColumn);
 +            return this;
 +        }
 +
 +        public Builder droppedColumns(Map<ByteBuffer, DroppedColumn> 
droppedColumns)
 +        {
 +            this.droppedColumns.clear();
 +            this.droppedColumns.putAll(droppedColumns);
 +            return this;
 +        }
 +
 +        /**
 +         * Records a deprecated column for a system table.
 +         */
 +        public Builder recordDeprecatedSystemColumn(String name, 
AbstractType<?> type)
 +        {
 +            // As we play fast and loose with the removal timestamp, make 
sure this is misued for a non system table.
-             assert SchemaConstants.isSystemKeyspace(keyspace);
++            assert SchemaConstants.isLocalSystemKeyspace(keyspace);
 +            recordColumnDrop(ColumnMetadata.regularColumn(keyspace, 
this.name, name, type), Long.MAX_VALUE);
 +            return this;
 +        }
 +
 +        public Builder recordColumnDrop(ColumnMetadata column, long 
timeMicros)
 +        {
 +            droppedColumns.put(column.name.bytes, new DroppedColumn(column, 
timeMicros));
 +            return this;
 +        }
 +
 +        public Iterable<ColumnMetadata> columns()
 +        {
 +            return columns.values();
 +        }
 +
 +        public Set<String> columnNames()
 +        {
 +            return columns.values().stream().map(c -> 
c.name.toString()).collect(toSet());
 +        }
 +
 +        public ColumnMetadata getColumn(ColumnIdentifier identifier)
 +        {
 +            return columns.get(identifier.bytes);
 +        }
 +
 +        public ColumnMetadata getColumn(ByteBuffer name)
 +        {
 +            return columns.get(name);
 +        }
 +
 +        public boolean hasRegularColumns()
 +        {
 +            return 
regularAndStaticColumns.stream().anyMatch(ColumnMetadata::isRegular);
 +        }
 +
 +        /*
 +         * The following methods all assume a Builder with valid set of 
partition key, clustering, regular and static columns.
 +         */
 +
 +        public Builder removeRegularOrStaticColumn(ColumnIdentifier 
identifier)
 +        {
 +            ColumnMetadata column = columns.get(identifier.bytes);
 +            if (column == null || column.isPrimaryKeyColumn())
 +                throw new IllegalArgumentException();
 +
 +            columns.remove(identifier.bytes);
 +            regularAndStaticColumns.remove(column);
 +
 +            return this;
 +        }
 +
 +        public Builder renamePrimaryKeyColumn(ColumnIdentifier from, 
ColumnIdentifier to)
 +        {
 +            if (columns.containsKey(to.bytes))
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata column = columns.get(from.bytes);
 +            if (column == null || !column.isPrimaryKeyColumn())
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata newColumn = column.withNewName(to);
 +            if (column.isPartitionKey())
 +                partitionKeyColumns.set(column.position(), newColumn);
 +            else
 +                clusteringColumns.set(column.position(), newColumn);
 +
 +            columns.remove(from.bytes);
 +            columns.put(to.bytes, newColumn);
 +
 +            return this;
 +        }
 +
 +        public Builder alterColumnType(ColumnIdentifier name, AbstractType<?> 
type)
 +        {
 +            ColumnMetadata column = columns.get(name.bytes);
 +            if (column == null)
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata newColumn = column.withNewType(type);
 +
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitionKeyColumns.set(column.position(), newColumn);
 +                    break;
 +                case CLUSTERING:
 +                    clusteringColumns.set(column.position(), newColumn);
 +                    break;
 +                case REGULAR:
 +                case STATIC:
 +                    regularAndStaticColumns.remove(column);
 +                    regularAndStaticColumns.add(newColumn);
 +                    break;
 +            }
 +
 +            columns.put(column.name.bytes, newColumn);
 +
 +            return this;
 +        }
 +    }
 +    
 +    /**
 +     * A table with strict liveness filters/ignores rows without PK liveness 
info,
 +     * effectively tying the row liveness to its primary key liveness.
 +     *
 +     * Currently this is only used by views with normal base column as PK 
column
 +     * so updates to other columns do not make the row live when the base 
column
 +     * is not live. See CASSANDRA-11500.
 +     */
 +    public boolean enforceStrictLiveness()
 +    {
 +        return isView && 
Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ClientState.java
index 80fcd33,2511779..e41cc4f
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@@ -55,7 -55,7 +55,7 @@@ public class ClientStat
  
      private static final Set<IResource> READABLE_SYSTEM_RESOURCES = new 
HashSet<>();
      private static final Set<IResource> PROTECTED_AUTH_RESOURCES = new 
HashSet<>();
-     private static final Set<String> ALTERABLE_SYSTEM_KEYSPACES = new 
HashSet<>();
 -    private static final Set<IResource> DROPPABLE_SYSTEM_AUTH_TABLES = new 
HashSet<>();
++
      static
      {
          // We want these system cfs to be always readable to authenticated 
users since many tools rely on them
@@@ -72,10 -72,10 +72,6 @@@
              
PROTECTED_AUTH_RESOURCES.addAll(DatabaseDescriptor.getAuthorizer().protectedResources());
              
PROTECTED_AUTH_RESOURCES.addAll(DatabaseDescriptor.getRoleManager().protectedResources());
          }
--
-         // allow users with sufficient privileges to alter KS level options 
on AUTH_KS and TRACING_KS
-         ALTERABLE_SYSTEM_KEYSPACES.add(SchemaConstants.AUTH_KEYSPACE_NAME);
-         ALTERABLE_SYSTEM_KEYSPACES.add(SchemaConstants.TRACE_KEYSPACE_NAME);
 -        
DROPPABLE_SYSTEM_AUTH_TABLES.add(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME,
 PasswordAuthenticator.LEGACY_CREDENTIALS_TABLE));
 -        
DROPPABLE_SYSTEM_AUTH_TABLES.add(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME,
 CassandraRoleManager.LEGACY_USERS_TABLE));
 -        
DROPPABLE_SYSTEM_AUTH_TABLES.add(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME,
 CassandraAuthorizer.USER_PERMISSIONS));
      }
  
      // Current user for the session
@@@ -384,20 -360,25 +386,21 @@@
  
      private void preventSystemKSSchemaModification(String keyspace, 
DataResource resource, Permission perm) throws UnauthorizedException
      {
-         // we only care about schema modification.
-         if (!((perm == Permission.ALTER) || (perm == Permission.DROP) || 
(perm == Permission.CREATE)))
+         // we only care about DDL statements
+         if (perm != Permission.ALTER && perm != Permission.DROP && perm != 
Permission.CREATE)
              return;
  
-         // prevent system keyspace modification
-         if (SchemaConstants.isSystemKeyspace(keyspace))
+         // prevent ALL local system keyspace modification
+         if (SchemaConstants.isLocalSystemKeyspace(keyspace))
              throw new UnauthorizedException(keyspace + " keyspace is not 
user-modifiable.");
  
-         // allow users with sufficient privileges to alter KS level options 
on AUTH_KS and
-         // TRACING_KS, but not to drop any tables
-         if 
(ALTERABLE_SYSTEM_KEYSPACES.contains(resource.getKeyspace().toLowerCase())
-            && ((perm == Permission.ALTER && !resource.isKeyspaceLevel())
-                || perm == Permission.DROP))
+         if (SchemaConstants.isReplicatedSystemKeyspace(keyspace))
          {
+             // allow users with sufficient privileges to alter replication 
params of replicated system keyspaces
+             if (perm == Permission.ALTER && resource.isKeyspaceLevel())
+                 return;
+ 
 -            // allow users with sufficient privileges to drop legacy tables 
in replicated system keyspaces
 -            if (perm == Permission.DROP && 
DROPPABLE_SYSTEM_AUTH_TABLES.contains(resource))
 -                return;
 -
+             // prevent all other modifications of replicated system keyspaces
              throw new UnauthorizedException(String.format("Cannot %s %s", 
perm, resource));
          }
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 49bb4b6,e67d46e..48d1f3f
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1577,7 -1552,7 +1577,7 @@@ public class StorageProxy implements St
      private static boolean systemKeyspaceQuery(List<? extends ReadCommand> 
cmds)
      {
          for (ReadCommand cmd : cmds)
-             if (!SchemaConstants.isSystemKeyspace(cmd.metadata().keyspace))
 -            if (!SchemaConstants.isLocalSystemKeyspace(cmd.metadata().ksName))
++            if 
(!SchemaConstants.isLocalSystemKeyspace(cmd.metadata().keyspace))
                  return false;
          return true;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8406b206/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to