http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java 
b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index deb7747..4eba4d8 100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.index.sasi;
 
-import java.io.File;
 import java.io.FileWriter;
 import java.io.Writer;
 import java.nio.ByteBuffer;
@@ -33,13 +32,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.cql3.statements.IndexTarget;
-import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
@@ -61,16 +59,10 @@ import org.apache.cassandra.index.sasi.memory.IndexMemtable;
 import org.apache.cassandra.index.sasi.plan.QueryController;
 import org.apache.cassandra.index.sasi.plan.QueryPlan;
 import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -102,13 +94,13 @@ public class SASIIndexTest
     public static void loadSchema() throws ConfigurationException
     {
         SchemaLoader.loadSchema();
-        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(KS_NAME,
-                                                                     
KeyspaceParams.simpleTransient(1),
-                                                                     
Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME),
-                                                                               
SchemaLoader.clusteringSASICFMD(KS_NAME, CLUSTERING_CF_NAME_1),
-                                                                               
SchemaLoader.clusteringSASICFMD(KS_NAME, CLUSTERING_CF_NAME_2, "location"),
-                                                                               
SchemaLoader.staticSASICFMD(KS_NAME, STATIC_CF_NAME),
-                                                                               
SchemaLoader.fullTextSearchSASICFMD(KS_NAME, FTS_CF_NAME))));
+        SchemaLoader.createKeyspace(KS_NAME,
+                                    KeyspaceParams.simpleTransient(1),
+                                    SchemaLoader.sasiCFMD(KS_NAME, CF_NAME),
+                                    SchemaLoader.clusteringSASICFMD(KS_NAME, 
CLUSTERING_CF_NAME_1),
+                                    SchemaLoader.clusteringSASICFMD(KS_NAME, 
CLUSTERING_CF_NAME_2, "location"),
+                                    SchemaLoader.staticSASICFMD(KS_NAME, 
STATIC_CF_NAME),
+                                    
SchemaLoader.fullTextSearchSASICFMD(KS_NAME, FTS_CF_NAME));
     }
 
     @Before
@@ -771,25 +763,25 @@ public class SASIIndexTest
         ColumnFamilyStore store = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 
         Mutation rm1 = new Mutation(KS_NAME, 
decoratedKey(AsciiType.instance.decompose("key1")));
-        rm1.add(PartitionUpdate.singleRowUpdate(store.metadata,
+        rm1.add(PartitionUpdate.singleRowUpdate(store.metadata(),
                                                 rm1.key(),
-                                                
buildRow(buildCell(store.metadata,
+                                                
buildRow(buildCell(store.metadata(),
                                                                    
UTF8Type.instance.decompose("/data/output/id"),
                                                                    
AsciiType.instance.decompose("jason"),
                                                                    
System.currentTimeMillis()))));
 
         Mutation rm2 = new Mutation(KS_NAME, 
decoratedKey(AsciiType.instance.decompose("key2")));
-        rm2.add(PartitionUpdate.singleRowUpdate(store.metadata,
+        rm2.add(PartitionUpdate.singleRowUpdate(store.metadata(),
                                                 rm2.key(),
-                                                
buildRow(buildCell(store.metadata,
+                                                
buildRow(buildCell(store.metadata(),
                                                                    
UTF8Type.instance.decompose("/data/output/id"),
                                                                    
AsciiType.instance.decompose("pavel"),
                                                                    
System.currentTimeMillis()))));
 
         Mutation rm3 = new Mutation(KS_NAME, 
decoratedKey(AsciiType.instance.decompose("key3")));
-        rm3.add(PartitionUpdate.singleRowUpdate(store.metadata,
+        rm3.add(PartitionUpdate.singleRowUpdate(store.metadata(),
                                                 rm3.key(),
-                                                
buildRow(buildCell(store.metadata,
+                                                
buildRow(buildCell(store.metadata(),
                                                                    
UTF8Type.instance.decompose("/data/output/id"),
                                                                    
AsciiType.instance.decompose("Aleksey"),
                                                                    
System.currentTimeMillis()))));
@@ -822,14 +814,14 @@ public class SASIIndexTest
         Assert.assertTrue(rows.toString(), rows.isEmpty());
 
         // now let's trigger index rebuild and check if we got the data back
-        
store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("data_output_id"));
+        
store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName(store.name
 + "_data_output_id"));
 
         rows = getIndexed(store, 10, buildExpression(dataOutputId, 
Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
         Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { 
"key1", "key2" }, rows.toArray(new String[rows.size()])));
 
         // also let's try to build an index for column which has no data to 
make sure that doesn't fail
-        
store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("first_name"));
-        
store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("data_output_id"));
+        
store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName(store.name
 + "_first_name"));
+        
store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName(store.name
 + "_data_output_id"));
 
         rows = getIndexed(store, 10, buildExpression(dataOutputId, 
Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
         Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { 
"key1", "key2" }, rows.toArray(new String[rows.size()])));
@@ -1307,14 +1299,14 @@ public class SASIIndexTest
         ColumnFamilyStore store = loadData(data1, true);
 
         RowFilter filter = RowFilter.create();
-        filter.add(store.metadata.getColumnDefinition(firstName), 
Operator.LIKE_CONTAINS, AsciiType.instance.fromString("a"));
+        filter.add(store.metadata().getColumn(firstName), 
Operator.LIKE_CONTAINS, AsciiType.instance.fromString("a"));
 
-        ReadCommand command = new PartitionRangeReadCommand(store.metadata,
+        ReadCommand command = new PartitionRangeReadCommand(store.metadata(),
                                                             
FBUtilities.nowInSeconds(),
-                                                            
ColumnFilter.all(store.metadata),
+                                                            
ColumnFilter.all(store.metadata()),
                                                             filter,
                                                             DataLimits.NONE,
-                                                            
DataRange.allData(store.metadata.partitioner),
+                                                            
DataRange.allData(store.metadata().partitioner),
                                                             Optional.empty());
 
         try
@@ -1602,7 +1594,7 @@ public class SASIIndexTest
         };
 
         // first let's check that we get 'false' for 'isLiteral' if we don't 
set the option with special comparator
-        ColumnDefinition columnA = ColumnDefinition.regularDef(KS_NAME, 
CF_NAME, "special-A", stringType);
+        ColumnMetadata columnA = ColumnMetadata.regularColumn(KS_NAME, 
CF_NAME, "special-A", stringType);
 
         ColumnIndex indexA = new ColumnIndex(UTF8Type.instance, columnA, 
IndexMetadata.fromSchemaMetadata("special-index-A", IndexMetadata.Kind.CUSTOM, 
new HashMap<String, String>()
         {{
@@ -1613,7 +1605,7 @@ public class SASIIndexTest
         Assert.assertEquals(false, indexA.isLiteral());
 
         // now let's double-check that we do get 'true' when we set it
-        ColumnDefinition columnB = ColumnDefinition.regularDef(KS_NAME, 
CF_NAME, "special-B", stringType);
+        ColumnMetadata columnB = ColumnMetadata.regularColumn(KS_NAME, 
CF_NAME, "special-B", stringType);
 
         ColumnIndex indexB = new ColumnIndex(UTF8Type.instance, columnB, 
IndexMetadata.fromSchemaMetadata("special-index-B", IndexMetadata.Kind.CUSTOM, 
new HashMap<String, String>()
         {{
@@ -1625,7 +1617,7 @@ public class SASIIndexTest
         Assert.assertEquals(true, indexB.isLiteral());
 
         // and finally we should also get a 'true' if it's built-in 
UTF-8/ASCII comparator
-        ColumnDefinition columnC = ColumnDefinition.regularDef(KS_NAME, 
CF_NAME, "special-C", UTF8Type.instance);
+        ColumnMetadata columnC = ColumnMetadata.regularColumn(KS_NAME, 
CF_NAME, "special-C", UTF8Type.instance);
 
         ColumnIndex indexC = new ColumnIndex(UTF8Type.instance, columnC, 
IndexMetadata.fromSchemaMetadata("special-index-C", IndexMetadata.Kind.CUSTOM, 
new HashMap<String, String>()
         {{
@@ -1635,7 +1627,7 @@ public class SASIIndexTest
         Assert.assertEquals(true, indexC.isIndexed());
         Assert.assertEquals(true, indexC.isLiteral());
 
-        ColumnDefinition columnD = ColumnDefinition.regularDef(KS_NAME, 
CF_NAME, "special-D", AsciiType.instance);
+        ColumnMetadata columnD = ColumnMetadata.regularColumn(KS_NAME, 
CF_NAME, "special-D", AsciiType.instance);
 
         ColumnIndex indexD = new ColumnIndex(UTF8Type.instance, columnD, 
IndexMetadata.fromSchemaMetadata("special-index-D", IndexMetadata.Kind.CUSTOM, 
new HashMap<String, String>()
         {{
@@ -1646,7 +1638,7 @@ public class SASIIndexTest
         Assert.assertEquals(true, indexD.isLiteral());
 
         // and option should supersedes the comparator type
-        ColumnDefinition columnE = ColumnDefinition.regularDef(KS_NAME, 
CF_NAME, "special-E", UTF8Type.instance);
+        ColumnMetadata columnE = ColumnMetadata.regularColumn(KS_NAME, 
CF_NAME, "special-E", UTF8Type.instance);
 
         ColumnIndex indexE = new ColumnIndex(UTF8Type.instance, columnE, 
IndexMetadata.fromSchemaMetadata("special-index-E", IndexMetadata.Kind.CUSTOM, 
new HashMap<String, String>()
         {{
@@ -1880,7 +1872,7 @@ public class SASIIndexTest
         store.forceBlockingFlush();
 
         SSTable ssTable = store.getSSTables(SSTableSet.LIVE).iterator().next();
-        Path path = 
FileSystems.getDefault().getPath(ssTable.getFilename().replace("-Data", 
"-SI_age"));
+        Path path = 
FileSystems.getDefault().getPath(ssTable.getFilename().replace("-Data", "-SI_" 
+ CLUSTERING_CF_NAME_1 + "_age"));
 
         // Overwrite index file with garbage
         Writer writer = new FileWriter(path.toFile(), false);
@@ -1892,7 +1884,7 @@ public class SASIIndexTest
         Assert.assertTrue(executeCQL(CLUSTERING_CF_NAME_1, "SELECT * FROM 
%s.%s WHERE age = 27 AND name = 'Pavel'").isEmpty());
 
         // Rebuld index
-        store.rebuildSecondaryIndex("age");
+        store.rebuildSecondaryIndex(CLUSTERING_CF_NAME_1 + "_age");
 
         long size2 = Files.readAttributes(path, 
BasicFileAttributes.class).size();
         // Make sure that garbage was overwriten
@@ -1917,7 +1909,7 @@ public class SASIIndexTest
             // invalid index mode
             SASIIndex.validateOptions(new HashMap<String, String>()
                                       {{ put("target", "address"); put("mode", 
"NORMAL"); }},
-                                      store.metadata);
+                                      store.metadata());
             Assert.fail();
         }
         catch (ConfigurationException e)
@@ -1930,7 +1922,7 @@ public class SASIIndexTest
             // invalid SPARSE on the literal index
             SASIIndex.validateOptions(new HashMap<String, String>()
                                       {{ put("target", "address"); put("mode", 
"SPARSE"); }},
-                                      store.metadata);
+                                      store.metadata());
             Assert.fail();
         }
         catch (ConfigurationException e)
@@ -1943,7 +1935,7 @@ public class SASIIndexTest
             // invalid SPARSE on the explicitly literal index
             SASIIndex.validateOptions(new HashMap<String, String>()
                                       {{ put("target", "height"); put("mode", 
"SPARSE"); put("is_literal", "true"); }},
-                    store.metadata);
+                                      store.metadata());
             Assert.fail();
         }
         catch (ConfigurationException e)
@@ -1956,7 +1948,7 @@ public class SASIIndexTest
             //  SPARSE with analyzer
             SASIIndex.validateOptions(new HashMap<String, String>()
                                       {{ put("target", "height"); put("mode", 
"SPARSE"); put("analyzed", "true"); }},
-                                      store.metadata);
+                                      store.metadata());
             Assert.fail();
         }
         catch (ConfigurationException e)
@@ -2229,12 +2221,12 @@ public class SASIIndexTest
             put("key1", Pair.create("Pavel", 14));
         }}, false);
 
-        ColumnIndex index = ((SASIIndex) 
store.indexManager.getIndexByName("first_name")).getIndex();
+        ColumnIndex index = ((SASIIndex) 
store.indexManager.getIndexByName(store.name + "_first_name")).getIndex();
         IndexMemtable beforeFlushMemtable = index.getCurrentMemtable();
 
-        PartitionRangeReadCommand command = new 
PartitionRangeReadCommand(store.metadata,
+        PartitionRangeReadCommand command = new 
PartitionRangeReadCommand(store.metadata(),
                                                                           
FBUtilities.nowInSeconds(),
-                                                                          
ColumnFilter.all(store.metadata),
+                                                                          
ColumnFilter.all(store.metadata()),
                                                                           
RowFilter.NONE,
                                                                           
DataLimits.NONE,
                                                                           
DataRange.allData(store.getPartitioner()),
@@ -2322,7 +2314,7 @@ public class SASIIndexTest
 
     private static Set<String> getIndexed(ColumnFamilyStore store, int 
maxResults, Expression... expressions)
     {
-        return getIndexed(store, ColumnFilter.all(store.metadata), maxResults, 
expressions);
+        return getIndexed(store, ColumnFilter.all(store.metadata()), 
maxResults, expressions);
     }
 
     private static Set<String> getIndexed(ColumnFamilyStore store, 
ColumnFilter columnFilter, int maxResults, Expression... expressions)
@@ -2341,7 +2333,7 @@ public class SASIIndexTest
         do
         {
             count = 0;
-            currentPage = getIndexed(store, ColumnFilter.all(store.metadata), 
lastKey, pageSize, expressions);
+            currentPage = getIndexed(store, 
ColumnFilter.all(store.metadata()), lastKey, pageSize, expressions);
             if (currentPage == null)
                 break;
 
@@ -2370,9 +2362,9 @@ public class SASIIndexTest
 
         RowFilter filter = RowFilter.create();
         for (Expression e : expressions)
-            filter.add(store.metadata.getColumnDefinition(e.name), e.op, 
e.value);
+            filter.add(store.metadata().getColumn(e.name), e.op, e.value);
 
-        ReadCommand command = new PartitionRangeReadCommand(store.metadata,
+        ReadCommand command = new PartitionRangeReadCommand(store.metadata(),
                                                             
FBUtilities.nowInSeconds(),
                                                             columnFilter,
                                                             filter,
@@ -2473,13 +2465,13 @@ public class SASIIndexTest
 
     private static Cell buildCell(ByteBuffer name, ByteBuffer value, long 
timestamp)
     {
-        CFMetaData cfm = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata;
-        return BufferCell.live(cfm.getColumnDefinition(name), timestamp, 
value);
+        TableMetadata cfm = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata();
+        return BufferCell.live(cfm.getColumn(name), timestamp, value);
     }
 
-    private static Cell buildCell(CFMetaData cfm, ByteBuffer name, ByteBuffer 
value, long timestamp)
+    private static Cell buildCell(TableMetadata cfm, ByteBuffer name, 
ByteBuffer value, long timestamp)
     {
-        ColumnDefinition column = cfm.getColumnDefinition(name);
+        ColumnMetadata column = cfm.getColumn(name);
         assert column != null;
         return BufferCell.live(column, timestamp, value);
     }
@@ -2491,14 +2483,14 @@ public class SASIIndexTest
 
     private static void update(Mutation rm, ByteBuffer name, ByteBuffer value, 
long timestamp)
     {
-        CFMetaData metadata = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata;
+        TableMetadata metadata = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata();
         rm.add(PartitionUpdate.singleRowUpdate(metadata, rm.key(), 
buildRow(buildCell(metadata, name, value, timestamp))));
     }
 
 
     private static void update(Mutation rm, List<Cell> cells)
     {
-        CFMetaData metadata = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata;
+        TableMetadata metadata = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata();
         rm.add(PartitionUpdate.singleRowUpdate(metadata, rm.key(), 
buildRow(cells)));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java 
b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
index d940186..97b3433 100644
--- 
a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
@@ -24,8 +24,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -43,10 +41,12 @@ import 
org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
-import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import com.google.common.util.concurrent.Futures;
@@ -67,7 +67,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         SchemaLoader.loadSchema();
         MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(KS_NAME,
                                                                      
KeyspaceParams.simpleTransient(1),
-                                                                     
Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME))));
+                                                                     
Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME).build())));
     }
 
     @Test
@@ -78,9 +78,9 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         final long timestamp = System.currentTimeMillis();
 
         ColumnFamilyStore cfs = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
-        ColumnDefinition column = 
cfs.metadata.getColumnDefinition(UTF8Type.instance.decompose("age"));
+        ColumnMetadata column = 
cfs.metadata().getColumn(UTF8Type.instance.decompose("age"));
 
-        SASIIndex sasi = (SASIIndex) cfs.indexManager.getIndexByName("age");
+        SASIIndex sasi = (SASIIndex) cfs.indexManager.getIndexByName(cfs.name 
+ "_age");
 
         File directory = cfs.getDirectories().getDirectoryForNewSSTables();
         Descriptor descriptor = cfs.newSSTableDescriptor(directory);
@@ -91,7 +91,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         for (int i = 0; i < maxKeys; i++)
         {
             ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, i));
-            expectedKeys.put(cfs.metadata.partitioner.decorateKey(key),
+            expectedKeys.put(cfs.metadata().partitioner.decorateKey(key),
                              BTreeRow.singleCellRow(Clustering.EMPTY,
                                                     BufferCell.live(column, 
timestamp, Int32Type.instance.decompose(i))));
         }
@@ -136,7 +136,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
 
         OnDiskIndex index = new OnDiskIndex(new File(indexFile), 
Int32Type.instance, keyPosition -> {
             ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, 
keyPosition));
-            return cfs.metadata.partitioner.decorateKey(key);
+            return cfs.metadata().partitioner.decorateKey(key);
         });
 
         Assert.assertEquals(0, UTF8Type.instance.compare(index.minKey(), 
ByteBufferUtil.bytes(String.format(keyFormat, 0))));
@@ -170,9 +170,9 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         final String columnName = "timestamp";
 
         ColumnFamilyStore cfs = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
-        ColumnDefinition column = 
cfs.metadata.getColumnDefinition(UTF8Type.instance.decompose(columnName));
+        ColumnMetadata column = 
cfs.metadata().getColumn(UTF8Type.instance.decompose(columnName));
 
-        SASIIndex sasi = (SASIIndex) 
cfs.indexManager.getIndexByName(columnName);
+        SASIIndex sasi = (SASIIndex) cfs.indexManager.getIndexByName(cfs.name 
+ "_" + columnName);
 
         File directory = cfs.getDirectories().getDirectoryForNewSSTables();
         Descriptor descriptor = cfs.newSSTableDescriptor(directory);
@@ -183,7 +183,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         indexWriter.begin();
         indexWriter.indexes.put(column, indexWriter.newIndex(sasi.getIndex()));
 
-        populateSegment(cfs.metadata, indexWriter.getIndex(column), new 
HashMap<Long, Set<Integer>>()
+        populateSegment(cfs.metadata(), indexWriter.getIndex(column), new 
HashMap<Long, Set<Integer>>()
         {{
             put(now,     new HashSet<>(Arrays.asList(0, 1)));
             put(now + 1, new HashSet<>(Arrays.asList(2, 3)));
@@ -201,7 +201,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         // now let's test multiple correct segments with yield incorrect final 
segment
         for (int i = 0; i < 3; i++)
         {
-            populateSegment(cfs.metadata, index, new HashMap<Long, 
Set<Integer>>()
+            populateSegment(cfs.metadata(), index, new HashMap<Long, 
Set<Integer>>()
             {{
                 put(now,     new HashSet<>(Arrays.asList(random.nextInt(), 
random.nextInt(), random.nextInt())));
                 put(now + 1, new HashSet<>(Arrays.asList(random.nextInt(), 
random.nextInt(), random.nextInt())));
@@ -236,7 +236,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         Assert.assertFalse(new File(index.outputFile).exists());
     }
 
-    private static void populateSegment(CFMetaData metadata, 
PerSSTableIndexWriter.Index index, Map<Long, Set<Integer>> data)
+    private static void populateSegment(TableMetadata metadata, 
PerSSTableIndexWriter.Index index, Map<Long, Set<Integer>> data)
     {
         for (Map.Entry<Long, Set<Integer>> value : data.entrySet())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java 
b/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
index e388cd4..8273dec 100644
--- a/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
@@ -25,8 +25,8 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -37,10 +37,7 @@ import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Tables;
-import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.junit.*;
@@ -61,11 +58,11 @@ public class OperationTest extends SchemaLoader
     {
         System.setProperty("cassandra.config", "cassandra-murmur.yaml");
         SchemaLoader.loadSchema();
-        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(KS_NAME,
-                                                                     
KeyspaceParams.simpleTransient(1),
-                                                                     
Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME),
-                                                                               
SchemaLoader.clusteringSASICFMD(KS_NAME, CLUSTERING_CF_NAME),
-                                                                               
SchemaLoader.staticSASICFMD(KS_NAME, STATIC_CF_NAME))));
+        SchemaLoader.createKeyspace(KS_NAME,
+                                    KeyspaceParams.simpleTransient(1),
+                                    SchemaLoader.sasiCFMD(KS_NAME, CF_NAME),
+                                    SchemaLoader.clusteringSASICFMD(KS_NAME, 
CLUSTERING_CF_NAME),
+                                    SchemaLoader.staticSASICFMD(KS_NAME, 
STATIC_CF_NAME));
 
         BACKEND = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
         CLUSTERING_BACKEND = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CLUSTERING_CF_NAME);
@@ -78,7 +75,7 @@ public class OperationTest extends SchemaLoader
     public void beforeTest()
     {
         controller = new QueryController(BACKEND,
-                                         
PartitionRangeReadCommand.allDataRead(BACKEND.metadata, 
FBUtilities.nowInSeconds()),
+                                         
PartitionRangeReadCommand.allDataRead(BACKEND.metadata(), 
FBUtilities.nowInSeconds()),
                                          TimeUnit.SECONDS.toMillis(10));
     }
 
@@ -91,9 +88,9 @@ public class OperationTest extends SchemaLoader
     @Test
     public void testAnalyze() throws Exception
     {
-        final ColumnDefinition firstName = 
getColumn(UTF8Type.instance.decompose("first_name"));
-        final ColumnDefinition age = 
getColumn(UTF8Type.instance.decompose("age"));
-        final ColumnDefinition comment = 
getColumn(UTF8Type.instance.decompose("comment"));
+        final ColumnMetadata firstName = 
getColumn(UTF8Type.instance.decompose("first_name"));
+        final ColumnMetadata age = 
getColumn(UTF8Type.instance.decompose("age"));
+        final ColumnMetadata comment = 
getColumn(UTF8Type.instance.decompose("comment"));
 
         // age != 5 AND age > 1 AND age != 6 AND age <= 10
         Map<Expression.Op, Expression> expressions = 
convert(Operation.analyzeGroup(controller, OperationType.AND,
@@ -184,8 +181,8 @@ public class OperationTest extends SchemaLoader
                             }}, expressions.get(Expression.Op.EQ));
 
         // comment = 'soft eng' and comment != 'likes do'
-        ListMultimap<ColumnDefinition, Expression> e = 
Operation.analyzeGroup(controller, OperationType.OR,
-                                                    Arrays.asList(new 
SimpleExpression(comment, Operator.LIKE_MATCHES, 
UTF8Type.instance.decompose("soft eng")),
+        ListMultimap<ColumnMetadata, Expression> e = 
Operation.analyzeGroup(controller, OperationType.OR,
+                                                                            
Arrays.asList(new SimpleExpression(comment, Operator.LIKE_MATCHES, 
UTF8Type.instance.decompose("soft eng")),
                                                                   new 
SimpleExpression(comment, Operator.NEQ, UTF8Type.instance.decompose("likes 
do"))));
 
         List<Expression> expectedExpressions = new ArrayList<Expression>(2)
@@ -274,8 +271,8 @@ public class OperationTest extends SchemaLoader
     @Test
     public void testSatisfiedBy() throws Exception
     {
-        final ColumnDefinition timestamp = 
getColumn(UTF8Type.instance.decompose("timestamp"));
-        final ColumnDefinition age = 
getColumn(UTF8Type.instance.decompose("age"));
+        final ColumnMetadata timestamp = 
getColumn(UTF8Type.instance.decompose("timestamp"));
+        final ColumnMetadata age = 
getColumn(UTF8Type.instance.decompose("age"));
 
         Operation.Builder builder = new Operation.Builder(OperationType.AND, 
controller, new SimpleExpression(age, Operator.NEQ, 
Int32Type.instance.decompose(5)));
         Operation op = builder.complete();
@@ -438,8 +435,8 @@ public class OperationTest extends SchemaLoader
     @Test
     public void testAnalyzeNotIndexedButDefinedColumn() throws Exception
     {
-        final ColumnDefinition firstName = 
getColumn(UTF8Type.instance.decompose("first_name"));
-        final ColumnDefinition height = 
getColumn(UTF8Type.instance.decompose("height"));
+        final ColumnMetadata firstName = 
getColumn(UTF8Type.instance.decompose("first_name"));
+        final ColumnMetadata height = 
getColumn(UTF8Type.instance.decompose("height"));
 
         // first_name = 'a' AND height != 10
         Map<Expression.Op, Expression> expressions;
@@ -490,7 +487,7 @@ public class OperationTest extends SchemaLoader
     @Test
     public void testSatisfiedByWithMultipleTerms()
     {
-        final ColumnDefinition comment = 
getColumn(UTF8Type.instance.decompose("comment"));
+        final ColumnMetadata comment = 
getColumn(UTF8Type.instance.decompose("comment"));
 
         Unfiltered row = 
buildRow(buildCell(comment,UTF8Type.instance.decompose("software engineer is 
working on a project"),System.currentTimeMillis()));
         Row staticRow = buildRow(Clustering.STATIC_CLUSTERING);
@@ -511,10 +508,10 @@ public class OperationTest extends SchemaLoader
     @Test
     public void testSatisfiedByWithClustering()
     {
-        ColumnDefinition location = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("location"));
-        ColumnDefinition age = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("age"));
-        ColumnDefinition height = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("height"));
-        ColumnDefinition score = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("score"));
+        ColumnMetadata location = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("location"));
+        ColumnMetadata age = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("age"));
+        ColumnMetadata height = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("height"));
+        ColumnMetadata score = getColumn(CLUSTERING_BACKEND, 
UTF8Type.instance.decompose("score"));
 
         Unfiltered row = 
buildRow(Clustering.make(UTF8Type.instance.fromString("US"), 
Int32Type.instance.decompose(27)),
                                   buildCell(height, 
Int32Type.instance.decompose(182), System.currentTimeMillis()),
@@ -567,7 +564,7 @@ public class OperationTest extends SchemaLoader
         Assert.assertTrue(builder.complete().satisfiedBy(row, staticRow, 
false));
     }
 
-    private Map<Expression.Op, Expression> convert(Multimap<ColumnDefinition, 
Expression> expressions)
+    private Map<Expression.Op, Expression> convert(Multimap<ColumnMetadata, 
Expression> expressions)
     {
         Map<Expression.Op, Expression> converted = new HashMap<>();
         for (Expression expression : expressions.values())
@@ -583,8 +580,8 @@ public class OperationTest extends SchemaLoader
     @Test
     public void testSatisfiedByWithStatic()
     {
-        final ColumnDefinition sensorType = getColumn(STATIC_BACKEND, 
UTF8Type.instance.decompose("sensor_type"));
-        final ColumnDefinition value = getColumn(STATIC_BACKEND, 
UTF8Type.instance.decompose("value"));
+        final ColumnMetadata sensorType = getColumn(STATIC_BACKEND, 
UTF8Type.instance.decompose("sensor_type"));
+        final ColumnMetadata value = getColumn(STATIC_BACKEND, 
UTF8Type.instance.decompose("value"));
 
         Unfiltered row = 
buildRow(Clustering.make(UTF8Type.instance.fromString("date"), 
LongType.instance.decompose(20160401L)),
                           buildCell(value, 
DoubleType.instance.decompose(24.56), System.currentTimeMillis()));
@@ -638,7 +635,7 @@ public class OperationTest extends SchemaLoader
 
     private static class SimpleExpression extends RowFilter.Expression
     {
-        SimpleExpression(ColumnDefinition column, Operator operator, 
ByteBuffer value)
+        SimpleExpression(ColumnMetadata column, Operator operator, ByteBuffer 
value)
         {
             super(column, operator, value);
         }
@@ -650,7 +647,7 @@ public class OperationTest extends SchemaLoader
         }
 
         @Override
-        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
+        public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey 
partitionKey, Row row)
         {
             throw new UnsupportedOperationException();
         }
@@ -684,23 +681,23 @@ public class OperationTest extends SchemaLoader
         return rowBuilder.build();
     }
 
-    private static Cell buildCell(ColumnDefinition column, ByteBuffer value, 
long timestamp)
+    private static Cell buildCell(ColumnMetadata column, ByteBuffer value, 
long timestamp)
     {
         return BufferCell.live(column, timestamp, value);
     }
 
-    private static Cell deletedCell(ColumnDefinition column, long timestamp, 
int nowInSeconds)
+    private static Cell deletedCell(ColumnMetadata column, long timestamp, int 
nowInSeconds)
     {
         return BufferCell.tombstone(column, timestamp, nowInSeconds);
     }
 
-    private static ColumnDefinition getColumn(ByteBuffer name)
+    private static ColumnMetadata getColumn(ByteBuffer name)
     {
         return getColumn(BACKEND, name);
     }
 
-    private static ColumnDefinition getColumn(ColumnFamilyStore cfs, 
ByteBuffer name)
+    private static ColumnMetadata getColumn(ColumnFamilyStore cfs, ByteBuffer 
name)
     {
-        return cfs.metadata.getColumnDefinition(name);
+        return cfs.metadata().getColumn(name);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java 
b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
index a2aff2f..1efccd3 100644
--- a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
@@ -32,13 +32,13 @@ public class CQLCompressionTest extends CQLTester
     public void lz4ParamsTest()
     {
         createTable("create table %s (id int primary key, uh text) with 
compression = {'class':'LZ4Compressor', 'lz4_high_compressor_level':3}");
-        
assertTrue(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType.equals(LZ4Compressor.LZ4_FAST_COMPRESSOR));
+        
assertTrue(((LZ4Compressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressorType.equals(LZ4Compressor.LZ4_FAST_COMPRESSOR));
         createTable("create table %s (id int primary key, uh text) with 
compression = {'class':'LZ4Compressor', 'lz4_compressor_type':'high', 
'lz4_high_compressor_level':13}");
-        
assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType,
 LZ4Compressor.LZ4_HIGH_COMPRESSOR);
-        
assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressionLevel,
 (Integer)13);
+        
assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressorType,
 LZ4Compressor.LZ4_HIGH_COMPRESSOR);
+        
assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressionLevel,
 (Integer)13);
         createTable("create table %s (id int primary key, uh text) with 
compression = {'class':'LZ4Compressor'}");
-        
assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType,
 LZ4Compressor.LZ4_FAST_COMPRESSOR);
-        
assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressionLevel,
 (Integer)9);
+        
assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressorType,
 LZ4Compressor.LZ4_FAST_COMPRESSOR);
+        
assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressionLevel,
 (Integer)9);
     }
 
     @Test(expected = ConfigurationException.class)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 4985342..7e8c1fb 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,7 @@ public class BigTableWriterTest extends 
AbstractTransactionalTest
 
         private TestableBTW(Descriptor desc)
         {
-            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, new 
SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), 
EncodingStats.NO_STATS)));
+            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, new 
SerializationHeader(true, cfs.metadata(), 
cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(Descriptor desc, SSTableTxnWriter sw)
@@ -81,7 +81,7 @@ public class BigTableWriterTest extends 
AbstractTransactionalTest
 
             for (int i = 0; i < 100; i++)
             {
-                UpdateBuilder update = UpdateBuilder.create(cfs.metadata, i);
+                UpdateBuilder update = UpdateBuilder.create(cfs.metadata(), i);
                 for (int j = 0; j < 10; j++)
                     update.newRow(j).add("val", SSTableRewriterTest.random(0, 
1000));
                 writer.append(update.build().unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index ac7f4ad..970d7ab 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.cql3.functions.UDHelper;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 import com.datastax.driver.core.DataType;
@@ -602,9 +604,9 @@ public class CQLSSTableWriterTest
                     addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddress());
             }
 
-            public CFMetaData getTableMetadata(String cfName)
+            public TableMetadataRef getTableMetadata(String cfName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getTableMetadataRef(keyspace, cfName);
             }
         }, new OutputHandler.SystemOutput(false, false));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index f287912..186f0e8 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -52,6 +52,8 @@ import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static com.google.common.collect.ImmutableMap.of;
@@ -64,7 +66,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class IndexSummaryManagerTest
 {
@@ -102,8 +103,8 @@ public class IndexSummaryManagerTest
         String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no 
key caching
         Keyspace keyspace = Keyspace.open(ksname);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
-        originalMinIndexInterval = cfs.metadata.params.minIndexInterval;
-        originalMaxIndexInterval = cfs.metadata.params.maxIndexInterval;
+        originalMinIndexInterval = cfs.metadata().params.minIndexInterval;
+        originalMaxIndexInterval = cfs.metadata().params.maxIndexInterval;
         originalCapacity = 
IndexSummaryManager.instance.getMemoryPoolCapacityInMB();
     }
 
@@ -119,8 +120,10 @@ public class IndexSummaryManagerTest
         String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no 
key caching
         Keyspace keyspace = Keyspace.open(ksname);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
-        cfs.metadata.minIndexInterval(originalMinIndexInterval);
-        cfs.metadata.maxIndexInterval(originalMaxIndexInterval);
+
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().minIndexInterval(originalMinIndexInterval).build(),
 true);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().maxIndexInterval(originalMaxIndexInterval).build(),
 true);
+
         
IndexSummaryManager.instance.setMemoryPoolCapacityInMB(originalCapacity);
     }
 
@@ -139,7 +142,7 @@ public class IndexSummaryManagerTest
 
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size());
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), originalOffHeapSize * sstables.size());
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
@@ -152,7 +155,7 @@ public class IndexSummaryManagerTest
         for (int i = 0; i < numPartition; i++)
         {
             Row row = Util.getOnlyRowUnfiltered(Util.cmd(cfs, 
String.format("%3d", i)).build());
-            Cell cell = 
row.getCell(cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("val")));
+            Cell cell = 
row.getCell(cfs.metadata().getColumn(ByteBufferUtil.bytes("val")));
             assertNotNull(cell);
             assertEquals(100, cell.value().array().length);
 
@@ -182,7 +185,7 @@ public class IndexSummaryManagerTest
             {
 
                 String key = String.format("%3d", p);
-                new RowUpdateBuilder(cfs.metadata, 0, key)
+                new RowUpdateBuilder(cfs.metadata(), 0, key)
                     .clustering("column")
                     .add("val", value)
                     .build()
@@ -221,34 +224,34 @@ public class IndexSummaryManagerTest
             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
 
         for (SSTableReader sstable : sstables)
-            assertEquals(cfs.metadata.params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
+            assertEquals(cfs.metadata().params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
 
         // double the min_index_interval
-        cfs.metadata.minIndexInterval(originalMinIndexInterval * 2);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().minIndexInterval(originalMinIndexInterval
 * 2).build(), true);
         IndexSummaryManager.instance.redistributeSummaries();
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
-            assertEquals(cfs.metadata.params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
-            assertEquals(numRows / cfs.metadata.params.minIndexInterval, 
sstable.getIndexSummarySize());
+            assertEquals(cfs.metadata().params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
+            assertEquals(numRows / cfs.metadata().params.minIndexInterval, 
sstable.getIndexSummarySize());
         }
 
         // return min_index_interval to its original value
-        cfs.metadata.minIndexInterval(originalMinIndexInterval);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().minIndexInterval(originalMinIndexInterval).build(),
 true);
         IndexSummaryManager.instance.redistributeSummaries();
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
-            assertEquals(cfs.metadata.params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
-            assertEquals(numRows / cfs.metadata.params.minIndexInterval, 
sstable.getIndexSummarySize());
+            assertEquals(cfs.metadata().params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
+            assertEquals(numRows / cfs.metadata().params.minIndexInterval, 
sstable.getIndexSummarySize());
         }
 
         // halve the min_index_interval, but constrain the available space to 
exactly what we have now; as a result,
         // the summary shouldn't change
-        cfs.metadata.minIndexInterval(originalMinIndexInterval / 2);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().minIndexInterval(originalMinIndexInterval
 / 2).build(), true);
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
         long summarySpace = sstable.getIndexSummaryOffHeapSize();
         try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), summarySpace);
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.id, 
txn), summarySpace);
         }
 
         sstable = cfs.getLiveSSTables().iterator().next();
@@ -260,7 +263,7 @@ public class IndexSummaryManagerTest
         int previousSize = sstable.getIndexSummarySize();
         try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5));
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.id, 
txn), (long) Math.ceil(summarySpace * 1.5));
         }
         sstable = cfs.getLiveSSTables().iterator().next();
         assertEquals(previousSize * 1.5, (double) 
sstable.getIndexSummarySize(), 1);
@@ -268,10 +271,10 @@ public class IndexSummaryManagerTest
 
         // return min_index_interval to it's original value (double it), but 
only give the summary enough space
         // to have an effective index interval of twice the new min
-        cfs.metadata.minIndexInterval(originalMinIndexInterval);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().minIndexInterval(originalMinIndexInterval).build(),
 true);
         try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0));
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.id, 
txn), (long) Math.ceil(summarySpace / 2.0));
         }
         sstable = cfs.getLiveSSTables().iterator().next();
         assertEquals(originalMinIndexInterval * 2, 
sstable.getEffectiveIndexInterval(), 0.001);
@@ -280,14 +283,14 @@ public class IndexSummaryManagerTest
         // raise the min_index_interval above our current effective interval, 
but set the max_index_interval lower
         // than what we actually have space for (meaning the index summary 
would ideally be smaller, but this would
         // result in an effective interval above the new max)
-        cfs.metadata.minIndexInterval(originalMinIndexInterval * 4);
-        cfs.metadata.maxIndexInterval(originalMinIndexInterval * 4);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().minIndexInterval(originalMinIndexInterval
 * 4).build(), true);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().maxIndexInterval(originalMinIndexInterval
 * 4).build(), true);
         try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), 10);
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.id, 
txn), 10);
         }
         sstable = cfs.getLiveSSTables().iterator().next();
-        assertEquals(cfs.metadata.params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
+        assertEquals(cfs.metadata().params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
     }
 
     @Test
@@ -307,35 +310,35 @@ public class IndexSummaryManagerTest
 
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), 10);
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.id, 
txn), 10);
         }
         sstables = new ArrayList<>(cfs.getLiveSSTables());
         for (SSTableReader sstable : sstables)
-            assertEquals(cfs.metadata.params.maxIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.01);
+            assertEquals(cfs.metadata().params.maxIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.01);
 
         // halve the max_index_interval
-        cfs.metadata.maxIndexInterval(cfs.metadata.params.maxIndexInterval / 
2);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().maxIndexInterval(cfs.metadata().params.maxIndexInterval
 / 2).build(), true);
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), 1);
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.id, 
txn), 1);
         }
         sstables = new ArrayList<>(cfs.getLiveSSTables());
         for (SSTableReader sstable : sstables)
         {
-            assertEquals(cfs.metadata.params.maxIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.01);
-            assertEquals(numRows / cfs.metadata.params.maxIndexInterval, 
sstable.getIndexSummarySize());
+            assertEquals(cfs.metadata().params.maxIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.01);
+            assertEquals(numRows / cfs.metadata().params.maxIndexInterval, 
sstable.getIndexSummarySize());
         }
 
         // return max_index_interval to its original value
-        cfs.metadata.maxIndexInterval(cfs.metadata.params.maxIndexInterval * 
2);
+        
MigrationManager.announceTableUpdate(cfs.metadata().unbuild().maxIndexInterval(cfs.metadata().params.maxIndexInterval
 * 2).build(), true);
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), 1);
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.id, 
txn), 1);
         }
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
-            assertEquals(cfs.metadata.params.maxIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.01);
-            assertEquals(numRows / cfs.metadata.params.maxIndexInterval, 
sstable.getIndexSummarySize());
+            assertEquals(cfs.metadata().params.maxIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.01);
+            assertEquals(numRows / cfs.metadata().params.maxIndexInterval, 
sstable.getIndexSummarySize());
         }
     }
 
@@ -350,7 +353,7 @@ public class IndexSummaryManagerTest
         int numRows = 256;
         createSSTables(ksname, cfname, numSSTables, numRows);
 
-        int minSamplingLevel = (BASE_SAMPLING_LEVEL * 
cfs.metadata.params.minIndexInterval) / cfs.metadata.params.maxIndexInterval;
+        int minSamplingLevel = (BASE_SAMPLING_LEVEL * 
cfs.metadata().params.minIndexInterval) / 
cfs.metadata().params.maxIndexInterval;
 
         List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
         for (SSTableReader sstable : sstables)
@@ -361,7 +364,7 @@ public class IndexSummaryManagerTest
         // there should be enough space to not downsample anything
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (singleSummaryOffHeapSpace * numSSTables));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
@@ -372,7 +375,7 @@ public class IndexSummaryManagerTest
         assert sstables.size() == 4;
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstable.getIndexSummarySamplingLevel());
@@ -381,7 +384,7 @@ public class IndexSummaryManagerTest
         // everything should get cut to a quarter
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4)));
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (singleSummaryOffHeapSpace * (numSSTables / 4)));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL / 4, 
sstable.getIndexSummarySamplingLevel());
@@ -390,7 +393,7 @@ public class IndexSummaryManagerTest
         // upsample back up to half
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 
4));
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4));
         }
         assert sstables.size() == 4;
         for (SSTableReader sstable : sstables)
@@ -400,7 +403,7 @@ public class IndexSummaryManagerTest
         // upsample back up to the original index summary
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (singleSummaryOffHeapSpace * numSSTables));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
@@ -412,7 +415,7 @@ public class IndexSummaryManagerTest
         sstables.get(1).overrideReadMeter(new RestorableMeter(50.0, 50.0));
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (singleSummaryOffHeapSpace * 3));
         }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstables.get(0).getIndexSummarySamplingLevel());
@@ -428,7 +431,7 @@ public class IndexSummaryManagerTest
         sstables.get(1).overrideReadMeter(new RestorableMeter(higherRate, 
higherRate));
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (singleSummaryOffHeapSpace * 3));
         }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstables.get(0).getIndexSummarySamplingLevel());
@@ -446,7 +449,7 @@ public class IndexSummaryManagerTest
 
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50);
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (singleSummaryOffHeapSpace * 3) + 50);
         }
         Collections.sort(sstables, hotnessComparator);
 
@@ -470,7 +473,7 @@ public class IndexSummaryManagerTest
         sstables.get(3).overrideReadMeter(new RestorableMeter(128.0, 128.0));
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + 
(singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), (long) (singleSummaryOffHeapSpace + 
(singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
         }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(1, sstables.get(0).getIndexSummarySize());  // at the min 
sampling level
@@ -483,7 +486,7 @@ public class IndexSummaryManagerTest
         // Don't leave enough space for even the minimal index summaries
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), 10);
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.id, txn), 10);
         }
         for (SSTableReader sstable : sstables)
             assertEquals(1, sstable.getIndexSummarySize());  // at the min 
sampling level
@@ -506,7 +509,7 @@ public class IndexSummaryManagerTest
         for (int row = 0; row < numRows; row++)
         {
             String key = String.format("%3d", row);
-            new RowUpdateBuilder(cfs.metadata, 0, key)
+            new RowUpdateBuilder(cfs.metadata(), 0, key)
             .clustering("column")
             .add("val", value)
             .build()
@@ -526,7 +529,7 @@ public class IndexSummaryManagerTest
             {
                 sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, 
samplingLevel);
                 assertEquals(samplingLevel, 
sstable.getIndexSummarySamplingLevel());
-                int expectedSize = (numRows * samplingLevel) / 
(sstable.metadata.params.minIndexInterval * BASE_SAMPLING_LEVEL);
+                int expectedSize = (numRows * samplingLevel) / 
(cfs.metadata().params.minIndexInterval * BASE_SAMPLING_LEVEL);
                 assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
                 txn.update(sstable, true);
                 txn.checkpoint();
@@ -572,7 +575,7 @@ public class IndexSummaryManagerTest
             for (int row = 0; row < numRows; row++)
             {
                 String key = String.format("%3d", row);
-                new RowUpdateBuilder(cfs.metadata, 0, key)
+                new RowUpdateBuilder(cfs.metadata(), 0, key)
                 .clustering("column")
                 .add("val", value)
                 .build()
@@ -581,20 +584,20 @@ public class IndexSummaryManagerTest
             cfs.forceBlockingFlush();
         }
 
-        assertTrue(manager.getAverageIndexInterval() >= 
cfs.metadata.params.minIndexInterval);
+        assertTrue(manager.getAverageIndexInterval() >= 
cfs.metadata().params.minIndexInterval);
         Map<String, Integer> intervals = manager.getIndexIntervals();
         for (Map.Entry<String, Integer> entry : intervals.entrySet())
             if (entry.getKey().contains(CF_STANDARDLOWiINTERVAL))
-                assertEquals(cfs.metadata.params.minIndexInterval, 
entry.getValue(), 0.001);
+                assertEquals(cfs.metadata().params.minIndexInterval, 
entry.getValue(), 0.001);
 
         manager.setMemoryPoolCapacityInMB(0);
         manager.redistributeSummaries();
-        assertTrue(manager.getAverageIndexInterval() > 
cfs.metadata.params.minIndexInterval);
+        assertTrue(manager.getAverageIndexInterval() > 
cfs.metadata().params.minIndexInterval);
         intervals = manager.getIndexIntervals();
         for (Map.Entry<String, Integer> entry : intervals.entrySet())
         {
             if (entry.getKey().contains(CF_STANDARDLOWiINTERVAL))
-                assertTrue(entry.getValue() >= 
cfs.metadata.params.minIndexInterval);
+                assertTrue(entry.getValue() >= 
cfs.metadata().params.minIndexInterval);
         }
     }
 
@@ -630,7 +633,7 @@ public class IndexSummaryManagerTest
                     try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
                     {
                         IndexSummaryManager.redistributeSummaries(new 
ObservableRedistribution(Collections.EMPTY_LIST,
-                                                                               
                of(cfs.metadata.cfId, txn),
+                                                                               
                of(cfs.metadata.id, txn),
                                                                                
                singleSummaryOffHeapSpace,
                                                                                
                barrier));
                     }
@@ -669,7 +672,7 @@ public class IndexSummaryManagerTest
     }
 
     private static List<SSTableReader> 
redistributeSummaries(List<SSTableReader> compacting,
-                                                             Map<UUID, 
LifecycleTransaction> transactions,
+                                                             Map<TableId, 
LifecycleTransaction> transactions,
                                                              long 
memoryPoolBytes)
     throws IOException
     {
@@ -683,7 +686,7 @@ public class IndexSummaryManagerTest
         CountDownLatch barrier;
 
         ObservableRedistribution(List<SSTableReader> compacting,
-                                 Map<UUID, LifecycleTransaction> transactions,
+                                 Map<TableId, LifecycleTransaction> 
transactions,
                                  long memoryPoolBytes,
                                  CountDownLatch barrier)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
index 1f2221e..bc82128 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@ -73,18 +73,16 @@ public class SSTableCorruptionDetectionTest extends 
SSTableWriterTestBase
     @BeforeClass
     public static void setUp()
     {
-        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
-                                           .addPartitionKey("pk", 
AsciiType.instance)
-                                           .addClusteringColumn("ck1", 
AsciiType.instance)
-                                           .addClusteringColumn("ck2", 
AsciiType.instance)
-                                           .addRegularColumn("reg1", 
BytesType.instance)
-                                           .addRegularColumn("reg2", 
BytesType.instance)
-                                           .build();
-
-        cfm.compression(CompressionParams.noCompression());
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        TableMetadata.Builder cfm =
+            TableMetadata.builder(keyspace, table)
+                         .addPartitionKeyColumn("pk", AsciiType.instance)
+                         .addClusteringColumn("ck1", AsciiType.instance)
+                         .addClusteringColumn("ck2", AsciiType.instance)
+                         .addRegularColumn("reg1", BytesType.instance)
+                         .addRegularColumn("reg2", BytesType.instance)
+                         .compression(CompressionParams.noCompression());
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), cfm);
 
         cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
         cfs.disableAutoCompaction();
@@ -104,7 +102,7 @@ public class SSTableCorruptionDetectionTest extends 
SSTableWriterTestBase
         writer = getWriter(cfs, dir, txn);
         for (int i = 0; i < numberOfPks; i++)
         {
-            UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, 
String.format("pkvalue_%07d", i)).withTimestamp(1);
+            UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), 
String.format("pkvalue_%07d", i)).withTimestamp(1);
             byte[] reg1 = new byte[valueSize];
             random.nextBytes(reg1);
             byte[] reg2 = new byte[valueSize];
@@ -210,7 +208,7 @@ public class SSTableCorruptionDetectionTest extends 
SSTableWriterTestBase
             for (int i = 0; i < numberOfPks; i++)
             {
                 DecoratedKey dk = Util.dk(String.format("pkvalue_%07d", i));
-                try (UnfilteredRowIterator rowIter = sstable.iterator(dk, 
Slices.ALL, ColumnFilter.all(cfs.metadata), false))
+                try (UnfilteredRowIterator rowIter = sstable.iterator(dk, 
Slices.ALL, ColumnFilter.all(cfs.metadata()), false))
                 {
                     while (rowIter.hasNext())
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 72c7467..4f3739f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -31,8 +31,9 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -106,9 +107,9 @@ public class SSTableLoaderTest
                 addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
         }
 
-        public CFMetaData getTableMetadata(String tableName)
+        public TableMetadataRef getTableMetadata(String tableName)
         {
-            return Schema.instance.getCFMetaData(keyspace, tableName);
+            return Schema.instance.getTableMetadataRef(keyspace, tableName);
         }
     }
 
@@ -117,7 +118,7 @@ public class SSTableLoaderTest
     {
         File dataDir = new File(tmpdir.getAbsolutePath() + File.separator + 
KEYSPACE1 + File.separator + CF_STANDARD1);
         assert dataDir.mkdirs();
-        CFMetaData cfmeta = Schema.instance.getCFMetaData(KEYSPACE1, 
CF_STANDARD1);
+        TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE1, 
CF_STANDARD1);
 
         String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, 
val1 ascii, PRIMARY KEY (key, name))";
         String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
@@ -143,7 +144,7 @@ public class SSTableLoaderTest
         assertEquals(1, partitions.size());
         assertEquals("key1", 
AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
         assertEquals(ByteBufferUtil.bytes("100"), 
partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
-                                                                   
.getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
+                                                                   
.getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
                                                                    .value());
 
         // The stream future is signalled when the work is complete but before 
releasing references. Wait for release

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
index ffe7b06..b922ca8 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
@@ -60,11 +60,11 @@ public class SSTableMetadataTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD),
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD2),
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD3),
-                                    CFMetaData.Builder.create(KEYSPACE1, 
CF_STANDARDCOMPOSITE2)
-                                                      .addPartitionKey("key", 
AsciiType.instance)
-                                                      
.addClusteringColumn("name", AsciiType.instance)
-                                                      
.addClusteringColumn("int", IntegerType.instance)
-                                                      .addRegularColumn("val", 
AsciiType.instance).build(),
+                                    TableMetadata.builder(KEYSPACE1, 
CF_STANDARDCOMPOSITE2)
+                                                 .addPartitionKeyColumn("key", 
AsciiType.instance)
+                                                 .addClusteringColumn("name", 
AsciiType.instance)
+                                                 .addClusteringColumn("int", 
IntegerType.instance)
+                                                 .addRegularColumn("val", 
AsciiType.instance),
                                     SchemaLoader.counterCFMD(KEYSPACE1, 
CF_COUNTER1));
     }
 
@@ -78,7 +78,7 @@ public class SSTableMetadataTest
         {
             DecoratedKey key = Util.dk(Integer.toString(i));
             for (int j = 0; j < 10; j++)
-                new RowUpdateBuilder(store.metadata, timestamp, 10 + j, 
Integer.toString(i))
+                new RowUpdateBuilder(store.metadata(), timestamp, 10 + j, 
Integer.toString(i))
                     .clustering(Integer.toString(j))
                     .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                     .build()
@@ -86,7 +86,7 @@ public class SSTableMetadataTest
 
         }
 
-        new RowUpdateBuilder(store.metadata, timestamp, 10000, "longttl")
+        new RowUpdateBuilder(store.metadata(), timestamp, 10000, "longttl")
             .clustering("col")
             .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
             .build()
@@ -104,7 +104,7 @@ public class SSTableMetadataTest
 
         }
 
-        new RowUpdateBuilder(store.metadata, timestamp, 20000, "longttl2")
+        new RowUpdateBuilder(store.metadata(), timestamp, 20000, "longttl2")
         .clustering("col")
         .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
         .build()
@@ -153,14 +153,14 @@ public class SSTableMetadataTest
         long timestamp = System.currentTimeMillis();
         DecoratedKey key = Util.dk("deletetest");
         for (int i = 0; i<5; i++)
-            new RowUpdateBuilder(store.metadata, timestamp, 100, "deletetest")
+            new RowUpdateBuilder(store.metadata(), timestamp, 100, 
"deletetest")
                 .clustering("deletecolumn" + i)
                 .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                 .build()
                 .applyUnsafe();
 
 
-        new RowUpdateBuilder(store.metadata, timestamp, 1000, "deletetest")
+        new RowUpdateBuilder(store.metadata(), timestamp, 1000, "deletetest")
         .clustering("todelete")
         .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
         .build()
@@ -176,7 +176,7 @@ public class SSTableMetadataTest
             assertEquals(ttltimestamp + 1000, firstMaxDelTime, 10);
         }
 
-        RowUpdateBuilder.deleteRow(store.metadata, timestamp + 1, 
"deletetest", "todelete").applyUnsafe();
+        RowUpdateBuilder.deleteRow(store.metadata(), timestamp + 1, 
"deletetest", "todelete").applyUnsafe();
 
         store.forceBlockingFlush();
         assertEquals(2,store.getLiveSSTables().size());
@@ -208,7 +208,7 @@ public class SSTableMetadataTest
             String key = "row" + j;
             for (int i = 100; i<150; i++)
             {
-                new RowUpdateBuilder(store.metadata, 
System.currentTimeMillis(), key)
+                new RowUpdateBuilder(store.metadata(), 
System.currentTimeMillis(), key)
                     .clustering(j + "col" + i)
                     .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                     .build()
@@ -226,7 +226,7 @@ public class SSTableMetadataTest
 
         for (int i = 101; i<299; i++)
         {
-            new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), 
key)
+            new RowUpdateBuilder(store.metadata(), System.currentTimeMillis(), 
key)
             .clustering(9 + "col" + i)
             .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
             .build()
@@ -263,7 +263,7 @@ public class SSTableMetadataTest
 
         for (int i = 0; i < 10; i++)
         {
-            new RowUpdateBuilder(cfs.metadata, 0, "k")
+            new RowUpdateBuilder(cfs.metadata(), 0, "k")
                 .clustering("a" + (9 - i), getBytes(i))
                 .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                 .build()
@@ -274,7 +274,7 @@ public class SSTableMetadataTest
 
         for (int i = 0; i < 10; i++)
         {
-            new RowUpdateBuilder(cfs.metadata, 0, "k2")
+            new RowUpdateBuilder(cfs.metadata(), 0, "k2")
             .clustering("b" + (9 - i), getBytes(i))
             .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
             .build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 982fc9c..64d0252 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -98,7 +98,7 @@ public class SSTableReaderTest
         CompactionManager.instance.disableAutoCompaction();
         for (int j = 0; j < 10; j++)
         {
-            new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
                 .clustering("0")
                 .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                 .build()
@@ -144,7 +144,7 @@ public class SSTableReaderTest
             CompactionManager.instance.disableAutoCompaction();
             for (int j = 0; j < 100; j += 2)
             {
-                new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+                new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
                 .clustering("0")
                 .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                 .build()
@@ -186,7 +186,7 @@ public class SSTableReaderTest
 
         for (int j = 0; j < 100; j += 2)
         {
-            new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
             .clustering("0")
             .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
             .build()
@@ -214,7 +214,7 @@ public class SSTableReaderTest
 
         for (int j = 0; j < 10; j++)
         {
-            new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
             .clustering("0")
             .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
             .build()
@@ -247,7 +247,7 @@ public class SSTableReaderTest
         for (int j = 0; j < 10; j++)
         {
 
-            new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
             .clustering("0")
             .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
             .build()
@@ -280,7 +280,7 @@ public class SSTableReaderTest
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_INDEXED);
         partitioner = store.getPartitioner();
 
-        new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), "k1")
+        new RowUpdateBuilder(store.metadata(), System.currentTimeMillis(), 
"k1")
             .clustering("0")
             .add("birthdate", 1L)
             .build()
@@ -302,7 +302,7 @@ public class SSTableReaderTest
         CompactionManager.instance.disableAutoCompaction();
         for (int j = 0; j < 10; j++)
         {
-            new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
             .clustering("0")
             .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
             .build()
@@ -339,18 +339,18 @@ public class SSTableReaderTest
 
         DecoratedKey firstKey = null, lastKey = null;
         long timestamp = System.currentTimeMillis();
-        for (int i = 0; i < store.metadata.params.minIndexInterval; i++)
+        for (int i = 0; i < store.metadata().params.minIndexInterval; i++)
         {
             DecoratedKey key = Util.dk(String.valueOf(i));
             if (firstKey == null)
                 firstKey = key;
             if (lastKey == null)
                 lastKey = key;
-            if (store.metadata.getKeyValidator().compare(lastKey.getKey(), 
key.getKey()) < 0)
+            if (store.metadata().partitionKeyType.compare(lastKey.getKey(), 
key.getKey()) < 0)
                 lastKey = key;
 
 
-            new RowUpdateBuilder(store.metadata, timestamp, key.getKey())
+            new RowUpdateBuilder(store.metadata(), timestamp, key.getKey())
                 .clustering("col")
                 .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                 .build()
@@ -377,7 +377,7 @@ public class SSTableReaderTest
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1");
 
-        new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), "k1")
+        new RowUpdateBuilder(store.metadata(), System.currentTimeMillis(), 
"k1")
         .clustering("0")
         .add("birthdate", 1L)
         .build()
@@ -406,7 +406,7 @@ public class SSTableReaderTest
         ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
         partitioner = store.getPartitioner();
 
-        new RowUpdateBuilder(store.metadata, 0, "k1")
+        new RowUpdateBuilder(store.metadata(), 0, "k1")
             .clustering("xyz")
             .add("val", "abc")
             .build()
@@ -439,7 +439,7 @@ public class SSTableReaderTest
         for (int j = 0; j < 130; j++)
         {
 
-            new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
             .clustering("0")
             .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
             .build()
@@ -478,7 +478,7 @@ public class SSTableReaderTest
         final int NUM_PARTITIONS = 512;
         for (int j = 0; j < NUM_PARTITIONS; j++)
         {
-            new RowUpdateBuilder(store.metadata, j, String.format("%3d", j))
+            new RowUpdateBuilder(store.metadata(), j, String.format("%3d", j))
             .clustering("0")
             .add("val", String.format("%3d", j))
             .build()
@@ -557,7 +557,7 @@ public class SSTableReaderTest
         final int NUM_PARTITIONS = 512;
         for (int j = 0; j < NUM_PARTITIONS; j++)
         {
-            new RowUpdateBuilder(store.metadata, j, String.format("%3d", j))
+            new RowUpdateBuilder(store.metadata(), j, String.format("%3d", j))
             .clustering("0")
             .add("val", String.format("%3d", j))
             .build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 9ea29e5..e3afaeb 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -75,7 +75,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
 
         for (int j = 0; j < 100; j ++)
         {
-            new RowUpdateBuilder(cfs.metadata, j, String.valueOf(j))
+            new RowUpdateBuilder(cfs.metadata(), j, String.valueOf(j))
                 .clustering("0")
                 .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                 .build()
@@ -692,7 +692,7 @@ public class SSTableRewriterTest extends 
SSTableWriterTestBase
             String key = Integer.toString(i);
 
             for (int j = 0; j < 10; j++)
-                new RowUpdateBuilder(cfs.metadata, 100, key)
+                new RowUpdateBuilder(cfs.metadata(), 100, key)
                     .clustering(Integer.toString(j))
                     .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                     .build()
@@ -936,12 +936,12 @@ public class SSTableRewriterTest extends 
SSTableWriterTestBase
             File dir = cfs.getDirectories().getDirectoryForNewSSTables();
             Descriptor desc = cfs.newSSTableDescriptor(dir);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 
0, 0, new SerializationHeader(true, cfs.metadata, 
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 
0, 0, new SerializationHeader(true, cfs.metadata(), 
cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * 
partitionCount) / fileCount;
                 for ( ; i < end ; i++)
                 {
-                    UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, 
ByteBufferUtil.bytes(i));
+                    UpdateBuilder builder = 
UpdateBuilder.create(cfs.metadata(), ByteBufferUtil.bytes(i));
                     for (int j = 0; j < cellCount ; j++)
                         builder.newRow(Integer.toString(i)).add("val", 
random(0, 1000));
 

Reply via email to