http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java 
b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
deleted file mode 100644
index 4270a24..0000000
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-public class SchemaCheckVerbHandler implements IVerbHandler
-{
-    private final Logger logger = 
LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
-
-    public void doVerb(MessageIn message, int id)
-    {
-        logger.trace("Received schema check request.");
-        MessageOut<UUID> response = new 
MessageOut<UUID>(MessagingService.Verb.INTERNAL_RESPONSE, 
Schema.instance.getVersion(), UUIDSerializer.serializer);
-        MessagingService.instance().sendReply(response, id, message.from);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 729d556..1f937f8 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -21,20 +21,20 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.sstable.metadata.MetadataType;
-import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SerializationHeader
@@ -46,7 +46,7 @@ public class SerializationHeader
     private final AbstractType<?> keyType;
     private final List<AbstractType<?>> clusteringTypes;
 
-    private final PartitionColumns columns;
+    private final RegularAndStaticColumns columns;
     private final EncodingStats stats;
 
     private final Map<ByteBuffer, AbstractType<?>> typeMap;
@@ -54,7 +54,7 @@ public class SerializationHeader
     private SerializationHeader(boolean isForSSTable,
                                 AbstractType<?> keyType,
                                 List<AbstractType<?>> clusteringTypes,
-                                PartitionColumns columns,
+                                RegularAndStaticColumns columns,
                                 EncodingStats stats,
                                 Map<ByteBuffer, AbstractType<?>> typeMap)
     {
@@ -66,12 +66,12 @@ public class SerializationHeader
         this.typeMap = typeMap;
     }
 
-    public static SerializationHeader makeWithoutStats(CFMetaData metadata)
+    public static SerializationHeader makeWithoutStats(TableMetadata metadata)
     {
-        return new SerializationHeader(true, metadata, 
metadata.partitionColumns(), EncodingStats.NO_STATS);
+        return new SerializationHeader(true, metadata, 
metadata.regularAndStaticColumns(), EncodingStats.NO_STATS);
     }
 
-    public static SerializationHeader make(CFMetaData metadata, 
Collection<SSTableReader> sstables)
+    public static SerializationHeader make(TableMetadata metadata, 
Collection<SSTableReader> sstables)
     {
         // The serialization header has to be computed before the start of 
compaction (since it's used to write)
         // the result. This means that when compacting multiple sources, we 
won't have perfectly accurate stats
@@ -84,34 +84,31 @@ public class SerializationHeader
         // our stats merging on the compacted files headers, which as we just 
said can be somewhat inaccurate,
         // but rather on their stats stored in StatsMetadata that are fully 
accurate.
         EncodingStats.Collector stats = new EncodingStats.Collector();
-        PartitionColumns.Builder columns = PartitionColumns.builder();
+        RegularAndStaticColumns.Builder columns = 
RegularAndStaticColumns.builder();
         for (SSTableReader sstable : sstables)
         {
             stats.updateTimestamp(sstable.getMinTimestamp());
             stats.updateLocalDeletionTime(sstable.getMinLocalDeletionTime());
             stats.updateTTL(sstable.getMinTTL());
-            if (sstable.header == null)
-                columns.addAll(metadata.partitionColumns());
-            else
-                columns.addAll(sstable.header.columns());
+            columns.addAll(sstable.header.columns());
         }
         return new SerializationHeader(true, metadata, columns.build(), 
stats.get());
     }
 
     public SerializationHeader(boolean isForSSTable,
-                               CFMetaData metadata,
-                               PartitionColumns columns,
+                               TableMetadata metadata,
+                               RegularAndStaticColumns columns,
                                EncodingStats stats)
     {
         this(isForSSTable,
-             metadata.getKeyValidator(),
+             metadata.partitionKeyType,
              metadata.comparator.subtypes(),
              columns,
              stats,
              null);
     }
 
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         return columns;
     }
@@ -146,7 +143,7 @@ public class SerializationHeader
         return isStatic ? columns.statics : columns.regulars;
     }
 
-    public AbstractType<?> getType(ColumnDefinition column)
+    public AbstractType<?> getType(ColumnMetadata column)
     {
         return typeMap == null ? column.type : typeMap.get(column.name.bytes);
     }
@@ -240,9 +237,9 @@ public class SerializationHeader
     {
         Map<ByteBuffer, AbstractType<?>> staticColumns = new LinkedHashMap<>();
         Map<ByteBuffer, AbstractType<?>> regularColumns = new 
LinkedHashMap<>();
-        for (ColumnDefinition column : columns.statics)
+        for (ColumnMetadata column : columns.statics)
             staticColumns.put(column.name.bytes, column.type);
-        for (ColumnDefinition column : columns.regulars)
+        for (ColumnMetadata column : columns.regulars)
             regularColumns.put(column.name.bytes, column.type);
         return new Component(keyType, clusteringTypes, staticColumns, 
regularColumns, stats);
     }
@@ -254,7 +251,7 @@ public class SerializationHeader
     }
 
     /**
-     * We need the CFMetadata to properly deserialize a SerializationHeader 
but it's clunky to pass that to
+     * We need the TableMetadata to properly deserialize a SerializationHeader 
but it's clunky to pass that to
      * a SSTable component, so we use this temporary object to delay the 
actual need for the metadata.
      */
     public static class Component extends MetadataComponent
@@ -283,16 +280,16 @@ public class SerializationHeader
             return MetadataType.HEADER;
         }
 
-        public SerializationHeader toHeader(CFMetaData metadata)
+        public SerializationHeader toHeader(TableMetadata metadata)
         {
             Map<ByteBuffer, AbstractType<?>> typeMap = new 
HashMap<>(staticColumns.size() + regularColumns.size());
             typeMap.putAll(staticColumns);
             typeMap.putAll(regularColumns);
 
-            PartitionColumns.Builder builder = PartitionColumns.builder();
+            RegularAndStaticColumns.Builder builder = 
RegularAndStaticColumns.builder();
             for (ByteBuffer name : typeMap.keySet())
             {
-                ColumnDefinition column = metadata.getColumnDefinition(name);
+                ColumnMetadata column = metadata.getColumn(name);
                 if (column == null)
                 {
                     // TODO: this imply we don't read data for a column we 
don't yet know about, which imply this is theoretically
@@ -301,10 +298,10 @@ public class SerializationHeader
                     // improve this.
 
                     // If we don't find the definition, it could be we have 
data for a dropped column, and we shouldn't
-                    // fail deserialization because of that. So we grab a 
"fake" ColumnDefinition that ensure proper
+                    // fail deserialization because of that. So we grab a 
"fake" ColumnMetadata that ensure proper
                     // deserialization. The column will be ignore later on 
anyway.
                     boolean isStatic = staticColumns.containsKey(name);
-                    column = metadata.getDroppedColumnDefinition(name, 
isStatic);
+                    column = metadata.getDroppedColumn(name, isStatic);
                     if (column == null)
                         throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
                 }
@@ -386,11 +383,11 @@ public class SerializationHeader
             }
         }
 
-        public SerializationHeader deserializeForMessaging(DataInputPlus in, 
CFMetaData metadata, ColumnFilter selection, boolean hasStatic) throws 
IOException
+        public SerializationHeader deserializeForMessaging(DataInputPlus in, 
TableMetadata metadata, ColumnFilter selection, boolean hasStatic) throws 
IOException
         {
             EncodingStats stats = EncodingStats.serializer.deserialize(in);
 
-            AbstractType<?> keyType = metadata.getKeyValidator();
+            AbstractType<?> keyType = metadata.partitionKeyType;
             List<AbstractType<?>> clusteringTypes = 
metadata.comparator.subtypes();
 
             Columns statics, regulars;
@@ -405,7 +402,7 @@ public class SerializationHeader
                 regulars = 
Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars, in);
             }
 
-            return new SerializationHeader(false, keyType, clusteringTypes, 
new PartitionColumns(statics, regulars), stats, null);
+            return new SerializationHeader(false, keyType, clusteringTypes, 
new RegularAndStaticColumns(statics, regulars), stats, null);
         }
 
         public long serializedSizeForMessaging(SerializationHeader header, 
ColumnFilter selection, boolean hasStatic)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/SimpleBuilders.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleBuilders.java 
b/src/java/org/apache/cassandra/db/SimpleBuilders.java
index 6e65743..a212834 100644
--- a/src/java/org/apache/cassandra/db/SimpleBuilders.java
+++ b/src/java/org/apache/cassandra/db/SimpleBuilders.java
@@ -20,9 +20,10 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -43,16 +44,16 @@ public abstract class SimpleBuilders
     {
     }
 
-    private static DecoratedKey makePartitonKey(CFMetaData metadata, Object... 
partitionKey)
+    private static DecoratedKey makePartitonKey(TableMetadata metadata, 
Object... partitionKey)
     {
         if (partitionKey.length == 1 && partitionKey[0] instanceof 
DecoratedKey)
             return (DecoratedKey)partitionKey[0];
 
-        ByteBuffer key = 
CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
-        return metadata.decorateKey(key);
+        ByteBuffer key = 
metadata.partitionKeyAsClusteringComparator().make(partitionKey).serializeAsPartitionKey();
+        return metadata.partitioner.decorateKey(key);
     }
 
-    private static Clustering makeClustering(CFMetaData metadata, Object... 
clusteringColumns)
+    private static Clustering makeClustering(TableMetadata metadata, Object... 
clusteringColumns)
     {
         if (clusteringColumns.length == 1 && clusteringColumns[0] instanceof 
Clustering)
             return (Clustering)clusteringColumns[0];
@@ -61,7 +62,7 @@ public abstract class SimpleBuilders
         {
             // If the table has clustering columns, passing no values is for 
updating the static values, so check we
             // do have some static columns defined.
-            assert metadata.comparator.size() == 0 || 
!metadata.partitionColumns().statics.isEmpty();
+            assert metadata.comparator.size() == 0 || 
!metadata.staticColumns().isEmpty();
             return metadata.comparator.size() == 0 ? Clustering.EMPTY : 
Clustering.STATIC_CLUSTERING;
         }
         else
@@ -107,7 +108,7 @@ public abstract class SimpleBuilders
         private final String keyspaceName;
         private final DecoratedKey key;
 
-        private final Map<UUID, PartitionUpdateBuilder> updateBuilders = new 
HashMap<>();
+        private final Map<TableId, PartitionUpdateBuilder> updateBuilders = 
new HashMap<>();
 
         public MutationBuilder(String keyspaceName, DecoratedKey key)
         {
@@ -115,15 +116,15 @@ public abstract class SimpleBuilders
             this.key = key;
         }
 
-        public PartitionUpdate.SimpleBuilder update(CFMetaData metadata)
+        public PartitionUpdate.SimpleBuilder update(TableMetadata metadata)
         {
-            assert metadata.ksName.equals(keyspaceName);
+            assert metadata.keyspace.equals(keyspaceName);
 
-            PartitionUpdateBuilder builder = updateBuilders.get(metadata.cfId);
+            PartitionUpdateBuilder builder = updateBuilders.get(metadata.id);
             if (builder == null)
             {
                 builder = new PartitionUpdateBuilder(metadata, key);
-                updateBuilders.put(metadata.cfId, builder);
+                updateBuilders.put(metadata.id, builder);
             }
 
             copyParams(builder);
@@ -133,7 +134,7 @@ public abstract class SimpleBuilders
 
         public PartitionUpdate.SimpleBuilder update(String tableName)
         {
-            CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, 
tableName);
+            TableMetadata metadata = 
Schema.instance.getTableMetadata(keyspaceName, tableName);
             assert metadata != null : "Unknown table " + tableName + " in 
keyspace " + keyspaceName;
             return update(metadata);
         }
@@ -154,20 +155,20 @@ public abstract class SimpleBuilders
 
     public static class PartitionUpdateBuilder extends 
AbstractBuilder<PartitionUpdate.SimpleBuilder> implements 
PartitionUpdate.SimpleBuilder
     {
-        private final CFMetaData metadata;
+        private final TableMetadata metadata;
         private final DecoratedKey key;
         private final Map<Clustering, RowBuilder> rowBuilders = new 
HashMap<>();
         private List<RTBuilder> rangeBuilders = null; // We use that rarely, 
so create lazily
 
         private DeletionTime partitionDeletion = DeletionTime.LIVE;
 
-        public PartitionUpdateBuilder(CFMetaData metadata, Object... 
partitionKeyValues)
+        public PartitionUpdateBuilder(TableMetadata metadata, Object... 
partitionKeyValues)
         {
             this.metadata = metadata;
             this.key = makePartitonKey(metadata, partitionKeyValues);
         }
 
-        public CFMetaData metadata()
+        public TableMetadata metadata()
         {
             return metadata;
         }
@@ -206,7 +207,7 @@ public abstract class SimpleBuilders
         public PartitionUpdate build()
         {
             // Collect all updated columns
-            PartitionColumns.Builder columns = PartitionColumns.builder();
+            RegularAndStaticColumns.Builder columns = 
RegularAndStaticColumns.builder();
             for (RowBuilder builder : rowBuilders.values())
                 columns.addAll(builder.columns());
 
@@ -296,15 +297,15 @@ public abstract class SimpleBuilders
 
     public static class RowBuilder extends AbstractBuilder<Row.SimpleBuilder> 
implements Row.SimpleBuilder
     {
-        private final CFMetaData metadata;
+        private final TableMetadata metadata;
 
-        private final Set<ColumnDefinition> columns = new HashSet<>();
+        private final Set<ColumnMetadata> columns = new HashSet<>();
         private final Row.Builder builder;
 
         private boolean initiated;
         private boolean noPrimaryKeyLivenessInfo;
 
-        public RowBuilder(CFMetaData metadata, Object... clusteringColumns)
+        public RowBuilder(TableMetadata metadata, Object... clusteringColumns)
         {
             this.metadata = metadata;
             this.builder = 
BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
@@ -312,7 +313,7 @@ public abstract class SimpleBuilders
             this.builder.newRow(makeClustering(metadata, clusteringColumns));
         }
 
-        Set<ColumnDefinition> columns()
+        Set<ColumnMetadata> columns()
         {
             return columns;
         }
@@ -345,7 +346,7 @@ public abstract class SimpleBuilders
         private Row.SimpleBuilder add(String columnName, Object value, boolean 
overwriteForCollection)
         {
             maybeInit();
-            ColumnDefinition column = getColumn(columnName);
+            ColumnMetadata column = getColumn(columnName);
 
             if (!overwriteForCollection && !(column.type.isMultiCell() && 
column.type.isCollection()))
                 throw new IllegalArgumentException("appendAll() can only be 
called on non-frozen colletions");
@@ -421,16 +422,16 @@ public abstract class SimpleBuilders
             return builder.build();
         }
 
-        private ColumnDefinition getColumn(String columnName)
+        private ColumnMetadata getColumn(String columnName)
         {
-            ColumnDefinition column = metadata.getColumnDefinition(new 
ColumnIdentifier(columnName, true));
+            ColumnMetadata column = metadata.getColumn(new 
ColumnIdentifier(columnName, true));
             assert column != null : "Cannot find column " + columnName;
             assert !column.isPrimaryKeyColumn();
             assert !column.isStatic() || builder.clustering() == 
Clustering.STATIC_CLUSTERING : "Cannot add non-static column to static-row";
             return column;
         }
 
-        private Cell cell(ColumnDefinition column, ByteBuffer value, CellPath 
path)
+        private Cell cell(ColumnMetadata column, ByteBuffer value, CellPath 
path)
         {
             if (value == null)
                 return BufferCell.tombstone(column, timestamp, nowInSec, path);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 9781bb9..cb50c43 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -32,11 +32,9 @@ import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.cache.RowCacheSentinel;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.Transformation;
@@ -47,7 +45,9 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
@@ -58,7 +58,6 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
-
 /**
  * A read command that selects a (part of a) single partition.
  */
@@ -73,7 +72,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
     public SinglePartitionReadCommand(boolean isDigest,
                                       int digestVersion,
-                                      CFMetaData metadata,
+                                      TableMetadata metadata,
                                       int nowInSec,
                                       ColumnFilter columnFilter,
                                       RowFilter rowFilter,
@@ -100,7 +99,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      *
      * @return a newly created read command.
      */
-    public static SinglePartitionReadCommand create(CFMetaData metadata,
+    public static SinglePartitionReadCommand create(TableMetadata metadata,
                                                     int nowInSec,
                                                     ColumnFilter columnFilter,
                                                     RowFilter rowFilter,
@@ -122,7 +121,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      *
      * @return a newly created read command. The returned command will use no 
row filter and have no limits.
      */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int 
nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter 
filter)
+    public static SinglePartitionReadCommand create(TableMetadata metadata, 
int nowInSec, DecoratedKey key, ColumnFilter columnFilter, 
ClusteringIndexFilter filter)
     {
         return create(metadata, nowInSec, columnFilter, RowFilter.NONE, 
DataLimits.NONE, key, filter);
     }
@@ -136,7 +135,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      *
      * @return a newly created read command that queries all the rows of 
{@code key}.
      */
-    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData 
metadata, int nowInSec, DecoratedKey key)
+    public static SinglePartitionReadCommand fullPartitionRead(TableMetadata 
metadata, int nowInSec, DecoratedKey key)
     {
         return SinglePartitionReadCommand.create(metadata, nowInSec, key, 
Slices.ALL);
     }
@@ -150,9 +149,9 @@ public class SinglePartitionReadCommand extends ReadCommand
      *
      * @return a newly created read command that queries all the rows of 
{@code key}.
      */
-    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData 
metadata, int nowInSec, ByteBuffer key)
+    public static SinglePartitionReadCommand fullPartitionRead(TableMetadata 
metadata, int nowInSec, ByteBuffer key)
     {
-        return SinglePartitionReadCommand.create(metadata, nowInSec, 
metadata.decorateKey(key), Slices.ALL);
+        return SinglePartitionReadCommand.create(metadata, nowInSec, 
metadata.partitioner.decorateKey(key), Slices.ALL);
     }
 
     /**
@@ -166,7 +165,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      * @return a newly created read command that queries {@code slice} in 
{@code key}. The returned query will
      * query every columns for the table (without limit or row filtering) and 
be in forward order.
      */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int 
nowInSec, DecoratedKey key, Slice slice)
+    public static SinglePartitionReadCommand create(TableMetadata metadata, 
int nowInSec, DecoratedKey key, Slice slice)
     {
         return create(metadata, nowInSec, key, 
Slices.with(metadata.comparator, slice));
     }
@@ -182,7 +181,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      * @return a newly created read command that queries the {@code slices} in 
{@code key}. The returned query will
      * query every columns for the table (without limit or row filtering) and 
be in forward order.
      */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int 
nowInSec, DecoratedKey key, Slices slices)
+    public static SinglePartitionReadCommand create(TableMetadata metadata, 
int nowInSec, DecoratedKey key, Slices slices)
     {
         ClusteringIndexSliceFilter filter = new 
ClusteringIndexSliceFilter(slices, false);
         return SinglePartitionReadCommand.create(metadata, nowInSec, 
ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
@@ -199,9 +198,9 @@ public class SinglePartitionReadCommand extends ReadCommand
      * @return a newly created read command that queries the {@code slices} in 
{@code key}. The returned query will
      * query every columns for the table (without limit or row filtering) and 
be in forward order.
      */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int 
nowInSec, ByteBuffer key, Slices slices)
+    public static SinglePartitionReadCommand create(TableMetadata metadata, 
int nowInSec, ByteBuffer key, Slices slices)
     {
-        return create(metadata, nowInSec, metadata.decorateKey(key), slices);
+        return create(metadata, nowInSec, 
metadata.partitioner.decorateKey(key), slices);
     }
 
     /**
@@ -215,7 +214,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      * @return a newly created read command that queries the {@code names} in 
{@code key}. The returned query will
      * query every columns (without limit or row filtering) and be in forward 
order.
      */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int 
nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
+    public static SinglePartitionReadCommand create(TableMetadata metadata, 
int nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
     {
         ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(names, false);
         return SinglePartitionReadCommand.create(metadata, nowInSec, 
ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
@@ -232,7 +231,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      * @return a newly created read command that queries {@code name} in 
{@code key}. The returned query will
      * query every columns (without limit or row filtering).
      */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int 
nowInSec, DecoratedKey key, Clustering name)
+    public static SinglePartitionReadCommand create(TableMetadata metadata, 
int nowInSec, DecoratedKey key, Clustering name)
     {
         return create(metadata, nowInSec, key, FBUtilities.singleton(name, 
metadata.comparator));
     }
@@ -267,7 +266,7 @@ public class SinglePartitionReadCommand extends ReadCommand
         if (!this.partitionKey().equals(key))
             return false;
 
-        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, 
metadata().getKeyValidator());
+        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, 
metadata().partitionKeyType);
     }
 
     public boolean selectsClustering(DecoratedKey key, Clustering clustering)
@@ -360,7 +359,7 @@ public class SinglePartitionReadCommand extends ReadCommand
         assert !cfs.isIndex(); // CASSANDRA-5732
         assert cfs.isRowCacheEnabled() : String.format("Row cache is not 
enabled on table [%s]", cfs.name);
 
-        RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, 
partitionKey());
+        RowCacheKey key = new RowCacheKey(metadata(), partitionKey());
 
         // Attempt a sentinel-read-cache sequence.  if a write invalidates our 
sentinel, we'll return our
         // (now potentially obsolete) data, but won't cache it. see 
CASSANDRA-3862
@@ -601,9 +600,9 @@ public class SinglePartitionReadCommand extends ReadCommand
                                nonIntersectingSSTables, view.sstables.size(), 
includedDueToTombstones);
 
             if (iterators.isEmpty())
-                return EmptyIterators.unfilteredRow(cfs.metadata, 
partitionKey(), filter.isReversed());
+                return EmptyIterators.unfilteredRow(cfs.metadata(), 
partitionKey(), filter.isReversed());
 
-            StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey());
+            StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
             return withSSTablesIterated(iterators, cfs.metric);
         }
         catch (RuntimeException | Error e)
@@ -676,7 +675,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
     private boolean queriesMulticellType()
     {
-        for (ColumnDefinition column : columnFilter().fetchedColumns())
+        for (ColumnMetadata column : columnFilter().fetchedColumns())
         {
             if (column.type.isMultiCell() || column.type.isCounter())
                 return true;
@@ -777,7 +776,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
         DecoratedKey key = result.partitionKey();
         
cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), 
key.hashCode(), 1);
-        StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey());
+        StorageHook.instance.reportRead(cfs.metadata.id, partitionKey());
 
         // "hoist up" the requested data into a more recent sstable
         if (sstablesIterated > cfs.getMinimumCompactionThreshold()
@@ -824,7 +823,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
         SearchIterator<Clustering, Row> searchIter = 
result.searchIterator(columnFilter(), false);
 
-        PartitionColumns columns = columnFilter().fetchedColumns();
+        RegularAndStaticColumns columns = columnFilter().fetchedColumns();
         NavigableSet<Clustering> clusterings = filter.requestedRows();
 
         // We want to remove rows for which we have values for all requested 
columns. We have to deal with both static and regular rows.
@@ -879,7 +878,7 @@ public class SinglePartitionReadCommand extends ReadCommand
         if (row.primaryKeyLivenessInfo().isEmpty() || 
row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
             return false;
 
-        for (ColumnDefinition column : requestedColumns)
+        for (ColumnMetadata column : requestedColumns)
         {
             Cell cell = row.getCell(column);
             if (cell == null || cell.timestamp() <= sstableTimestamp)
@@ -891,13 +890,12 @@ public class SinglePartitionReadCommand extends 
ReadCommand
     @Override
     public String toString()
     {
-        return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s 
key=%s filter=%s, nowInSec=%d)",
-                             metadata().ksName,
-                             metadata().cfName,
+        return String.format("Read(%s columns=%s rowFilter=%s limits=%s key=%s 
filter=%s, nowInSec=%d)",
+                             metadata().toString(),
                              columnFilter(),
                              rowFilter(),
                              limits(),
-                             
metadata().getKeyValidator().getString(partitionKey().getKey()),
+                             
metadata().partitionKeyType.getString(partitionKey().getKey()),
                              clusteringIndexFilter.toString(metadata()),
                              nowInSec());
     }
@@ -911,8 +909,8 @@ public class SinglePartitionReadCommand extends ReadCommand
     {
         sb.append(" WHERE ");
 
-        
sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append("
 = ");
-        DataRange.appendKeyString(sb, metadata().getKeyValidator(), 
partitionKey().getKey());
+        
sb.append(ColumnMetadata.toCQLString(metadata().partitionKeyColumns())).append("
 = ");
+        DataRange.appendKeyString(sb, metadata().partitionKeyType, 
partitionKey().getKey());
 
         // We put the row filter first because the clustering index filter can 
end by "ORDER BY"
         if (!rowFilter().isEmpty())
@@ -925,13 +923,13 @@ public class SinglePartitionReadCommand extends 
ReadCommand
 
     protected void serializeSelection(DataOutputPlus out, int version) throws 
IOException
     {
-        metadata().getKeyValidator().writeValue(partitionKey().getKey(), out);
+        metadata().partitionKeyType.writeValue(partitionKey().getKey(), out);
         ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), 
out, version);
     }
 
     protected long selectionSerializedSize(int version)
     {
-        return 
metadata().getKeyValidator().writtenLength(partitionKey().getKey())
+        return 
metadata().partitionKeyType.writtenLength(partitionKey().getKey())
              + 
ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), 
version);
     }
 
@@ -974,7 +972,7 @@ public class SinglePartitionReadCommand extends ReadCommand
             return limits;
         }
 
-        public CFMetaData metadata()
+        public TableMetadata metadata()
         {
             return commands.get(0).metadata();
         }
@@ -1046,10 +1044,10 @@ public class SinglePartitionReadCommand extends 
ReadCommand
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, CFMetaData metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> 
index)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> 
index)
         throws IOException
         {
-            DecoratedKey key = 
metadata.decorateKey(metadata.getKeyValidator().readValue(in, 
DatabaseDescriptor.getMaxValueSize()));
+            DecoratedKey key = 
metadata.partitioner.decorateKey(metadata.partitionKeyType.readValue(in, 
DatabaseDescriptor.getMaxValueSize()));
             ClusteringIndexFilter filter = 
ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
             return new SinglePartitionReadCommand(isDigest, digestVersion, 
metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java 
b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index 9c199b6..f70e45e 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -30,8 +30,8 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.service.MigrationListener;
-import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaChangeListener;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -46,7 +46,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
  *
  * See CASSANDRA-7688.
  */
-public class SizeEstimatesRecorder extends MigrationListener implements 
Runnable
+public class SizeEstimatesRecorder extends SchemaChangeListener implements 
Runnable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(SizeEstimatesRecorder.class);
 
@@ -54,7 +54,7 @@ public class SizeEstimatesRecorder extends MigrationListener 
implements Runnable
 
     private SizeEstimatesRecorder()
     {
-        MigrationManager.instance.register(this);
+        Schema.instance.registerListener(this);
     }
 
     public void run()
@@ -81,8 +81,8 @@ public class SizeEstimatesRecorder extends MigrationListener 
implements Runnable
                 long passed = System.nanoTime() - start;
                 logger.trace("Spent {} milliseconds on estimating {}.{} size",
                              TimeUnit.NANOSECONDS.toMillis(passed),
-                             table.metadata.ksName,
-                             table.metadata.cfName);
+                             table.metadata.keyspace,
+                             table.metadata.name);
             }
         }
     }
@@ -124,7 +124,7 @@ public class SizeEstimatesRecorder extends 
MigrationListener implements Runnable
         }
 
         // atomically update the estimates.
-        SystemKeyspace.updateSizeEstimates(table.metadata.ksName, 
table.metadata.cfName, estimates);
+        SystemKeyspace.updateSizeEstimates(table.metadata.keyspace, 
table.metadata.name, estimates);
     }
 
     private long estimatePartitionsCount(Collection<SSTableReader> sstables, 
Range<Token> range)
@@ -148,7 +148,7 @@ public class SizeEstimatesRecorder extends 
MigrationListener implements Runnable
     }
 
     @Override
-    public void onDropColumnFamily(String keyspace, String table)
+    public void onDropTable(String keyspace, String table)
     {
         SystemKeyspace.clearSizeEstimates(keyspace, table);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java 
b/src/java/org/apache/cassandra/db/Slice.java
index c3da222..681d79c 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -260,11 +259,6 @@ public class Slice
         return true;
     }
 
-    public String toString(CFMetaData metadata)
-    {
-        return toString(metadata.comparator);
-    }
-
     public String toString(ClusteringComparator comparator)
     {
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java 
b/src/java/org/apache/cassandra/db/Slices.java
index b3fd20a..9900112 100644
--- a/src/java/org/apache/cassandra/db/Slices.java
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -23,8 +23,8 @@ import java.util.*;
 
 import com.google.common.collect.Iterators;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -140,7 +140,7 @@ public abstract class Slices implements Iterable<Slice>
      */
     public abstract boolean intersects(List<ByteBuffer> minClusteringValues, 
List<ByteBuffer> maxClusteringValues);
 
-    public abstract String toCQLString(CFMetaData metadata);
+    public abstract String toCQLString(TableMetadata metadata);
 
     /**
      * Checks if this <code>Slices</code> is empty.
@@ -323,7 +323,7 @@ public abstract class Slices implements Iterable<Slice>
             return size;
         }
 
-        public Slices deserialize(DataInputPlus in, int version, CFMetaData 
metadata) throws IOException
+        public Slices deserialize(DataInputPlus in, int version, TableMetadata 
metadata) throws IOException
         {
             int size = (int)in.readUnsignedVInt();
 
@@ -548,7 +548,7 @@ public abstract class Slices implements Iterable<Slice>
             return sb.append("}").toString();
         }
 
-        public String toCQLString(CFMetaData metadata)
+        public String toCQLString(TableMetadata metadata)
         {
             StringBuilder sb = new StringBuilder();
 
@@ -572,7 +572,7 @@ public abstract class Slices implements Iterable<Slice>
             boolean needAnd = false;
             for (int i = 0; i < clusteringSize; i++)
             {
-                ColumnDefinition column = metadata.clusteringColumns().get(i);
+                ColumnMetadata column = metadata.clusteringColumns().get(i);
                 List<ComponentOfSlice> componentInfo = columnComponents.get(i);
                 if (componentInfo.isEmpty())
                     break;
@@ -634,7 +634,7 @@ public abstract class Slices implements Iterable<Slice>
             return sb.toString();
         }
 
-        // An somewhat adhoc utility class only used by toCQLString
+        // An somewhat adhoc utility class only used by nameAsCQLString
         private static class ComponentOfSlice
         {
             public final boolean startInclusive;
@@ -751,7 +751,7 @@ public abstract class Slices implements Iterable<Slice>
             return "ALL";
         }
 
-        public String toCQLString(CFMetaData metadata)
+        public String toCQLString(TableMetadata metadata)
         {
             return "";
         }
@@ -826,7 +826,7 @@ public abstract class Slices implements Iterable<Slice>
             return "NONE";
         }
 
-        public String toCQLString(CFMetaData metadata)
+        public String toCQLString(TableMetadata metadata)
         {
             return "";
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/StorageHook.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/StorageHook.java 
b/src/java/org/apache/cassandra/db/StorageHook.java
index c1e7f66..3df8805 100644
--- a/src/java/org/apache/cassandra/db/StorageHook.java
+++ b/src/java/org/apache/cassandra/db/StorageHook.java
@@ -18,22 +18,21 @@
 
 package org.apache.cassandra.db;
 
-import java.util.UUID;
-
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.FBUtilities;
 
 public interface StorageHook
 {
     public static final StorageHook instance = createHook();
 
-    public void reportWrite(UUID cfid, PartitionUpdate partitionUpdate);
-    public void reportRead(UUID cfid, DecoratedKey key);
+    public void reportWrite(TableId tableId, PartitionUpdate partitionUpdate);
+    public void reportRead(TableId tableId, DecoratedKey key);
     public UnfilteredRowIteratorWithLowerBound 
makeRowIteratorWithLowerBound(ColumnFamilyStore cfs,
                                                                       
DecoratedKey partitionKey,
                                                                       
SSTableReader sstable,
@@ -57,9 +56,9 @@ public interface StorageHook
         {
             return new StorageHook()
             {
-                public void reportWrite(UUID cfid, PartitionUpdate 
partitionUpdate) {}
+                public void reportWrite(TableId tableId, PartitionUpdate 
partitionUpdate) {}
 
-                public void reportRead(UUID cfid, DecoratedKey key) {}
+                public void reportRead(TableId tableId, DecoratedKey key) {}
 
                 public UnfilteredRowIteratorWithLowerBound 
makeRowIteratorWithLowerBound(ColumnFamilyStore cfs, DecoratedKey partitionKey, 
SSTableReader sstable, ClusteringIndexFilter filter, ColumnFilter 
selectedColumns)
                 {

Reply via email to