Add schema to snapshot manifest, add WITH TIMESTAMP to DROP statement

Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-7190


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6528fbf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6528fbf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6528fbf2

Branch: refs/heads/cassandra-3.8
Commit: 6528fbf250fdd2c75c8f9464e279628234a5fef4
Parents: e6dd152
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Authored: Wed Apr 20 14:57:52 2016 +0200
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Mon Aug 15 18:31:45 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java |   7 +-
 .../apache/cassandra/cql3/ColumnIdentifier.java |   2 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |  11 +-
 .../cql3/statements/AlterTableStatement.java    |   8 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  23 +
 .../db/ColumnFamilyStoreCQLHelper.java          | 442 ++++++++++++
 .../org/apache/cassandra/db/Directories.java    |   6 +
 .../unit/org/apache/cassandra/SchemaLoader.java |   5 +
 .../cql3/validation/operations/AlterTest.java   |  71 ++
 .../db/ColumnFamilyStoreCQLHelperTest.java      | 679 +++++++++++++++++++
 .../schema/LegacySchemaMigratorTest.java        |   3 +-
 12 files changed, 1248 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7ba8370..a202755 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.9
+ * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE 
statements (CASSANDRA-7190)
  * Fix clean interval not sent to commit log for empty memtable flush 
(CASSANDRA-12436)
  * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
  * Backport CASSANDRA-12002 (CASSANDRA-12177)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 5678ada..c0f8d2d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -954,9 +954,12 @@ public final class CFMetaData
         return removed;
     }
 
-    public void recordColumnDrop(ColumnDefinition def)
+    /**
+     * Adds the column definition as a dropped column, recording the drop with 
the provided timestamp.
+     */
+    public void recordColumnDrop(ColumnDefinition def, long deleteTimestamp)
     {
-        droppedColumns.put(def.name.bytes, new 
DroppedColumn(def.name.toString(), def.type, FBUtilities.timestampMicros()));
+        droppedColumns.put(def.name.bytes, new 
DroppedColumn(def.name.toString(), def.type, deleteTimestamp));
     }
 
     public void renameColumn(ColumnIdentifier from, ColumnIdentifier to) 
throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java 
b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index 93734e9..afb65e1 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -328,7 +328,7 @@ public class ColumnIdentifier extends Selectable implements 
IMeasurableMemory, C
         }
     }
 
-    static String maybeQuote(String text)
+    public static String maybeQuote(String text)
     {
         if (UNQUOTED_IDENTIFIER.matcher(text).matches())
             return text;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g 
b/src/java/org/apache/cassandra/cql3/Cql.g
index 453a03d..d50b979 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -818,18 +818,21 @@ alterTableStatement returns [AlterTableStatement expr]
         TableAttributes attrs = new TableAttributes();
         Map<ColumnIdentifier.Raw, ColumnIdentifier.Raw> renames = new 
HashMap<ColumnIdentifier.Raw, ColumnIdentifier.Raw>();
         boolean isStatic = false;
+        Long dropTimestamp = null;
     }
     : K_ALTER K_COLUMNFAMILY cf=columnFamilyName
           ( K_ALTER id=cident K_TYPE v=comparatorType { type = 
AlterTableStatement.Type.ALTER; }
           | K_ADD   id=cident v=comparatorType ({ isStatic=true; } K_STATIC)? 
{ type = AlterTableStatement.Type.ADD; }
-          | K_DROP  id=cident                         { type = 
AlterTableStatement.Type.DROP; }
-          | K_WITH  properties[attrs]                 { type = 
AlterTableStatement.Type.OPTS; }
-          | K_RENAME                                  { type = 
AlterTableStatement.Type.RENAME; }
+          | K_DROP  id=cident                               { type = 
AlterTableStatement.Type.DROP; }
+          | K_DROP  id=cident K_USING K_TIMESTAMP t=INTEGER { type = 
AlterTableStatement.Type.DROP;
+                                                              dropTimestamp = 
Long.parseLong(Constants.Literal.integer($t.text).getText()); }
+          | K_WITH  properties[attrs]                       { type = 
AlterTableStatement.Type.OPTS; }
+          | K_RENAME                                        { type = 
AlterTableStatement.Type.RENAME; }
                id1=cident K_TO toId1=cident { renames.put(id1, toId1); }
                ( K_AND idn=cident K_TO toIdn=cident { renames.put(idn, toIdn); 
} )*
           )
     {
-        $expr = new AlterTableStatement(cf, type, id, v, attrs, renames, 
isStatic);
+        $expr = new AlterTableStatement(cf, type, id, v, attrs, renames, 
isStatic, dropTimestamp);
     }
     ;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 381971f..d64e541 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.transport.Event;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static 
org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
@@ -57,6 +58,7 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
     private final TableAttributes attrs;
     private final Map<ColumnIdentifier.Raw, ColumnIdentifier.Raw> renames;
     private final boolean isStatic; // Only for ALTER ADD
+    private final long deleteTimestamp;
 
     public AlterTableStatement(CFName name,
                                Type type,
@@ -64,7 +66,8 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
                                CQL3Type.Raw validator,
                                TableAttributes attrs,
                                Map<ColumnIdentifier.Raw, ColumnIdentifier.Raw> 
renames,
-                               boolean isStatic)
+                               boolean isStatic,
+                               Long deleteTimestamp)
     {
         super(name);
         this.oType = type;
@@ -73,6 +76,7 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
         this.attrs = attrs;
         this.renames = renames;
         this.isStatic = isStatic;
+        this.deleteTimestamp = deleteTimestamp == null ? 
FBUtilities.timestampMicros() : deleteTimestamp;
     }
 
     public void checkAccess(ClientState state) throws UnauthorizedException, 
InvalidRequestException
@@ -238,7 +242,7 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
                         }
                         assert toDelete != null;
                         cfm.removeColumnDefinition(toDelete);
-                        cfm.recordColumnDrop(toDelete);
+                        cfm.recordColumnDrop(toDelete, deleteTimestamp);
                         break;
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6c02909..fb3665b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1627,6 +1627,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 }
 
                 writeSnapshotManifest(filesJSONArr, snapshotName);
+                if (!Schema.SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName) && 
!Schema.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName))
+                    writeSnapshotSchema(snapshotName);
             }
         }
         if (ephemeral)
@@ -1656,6 +1658,27 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
+    private void writeSnapshotSchema(final String snapshotName)
+    {
+        final File schemaFile = 
getDirectories().getSnapshotSchemaFile(snapshotName);
+
+        try
+        {
+            if (!schemaFile.getParentFile().exists())
+                schemaFile.getParentFile().mkdirs();
+
+            try (PrintStream out = new PrintStream(schemaFile))
+            {
+                for (String s: 
ColumnFamilyStoreCQLHelper.dumpReCreateStatements(metadata))
+                    out.println(s);
+            }
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, schemaFile);
+        }
+    }
+
     private void createEphemeralSnapshotMarkerFile(final String snapshot)
     {
         final File ephemeralSnapshotMarker = 
getDirectories().getNewEphemeralSnapshotMarkerFile(snapshot);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java
new file mode 100644
index 0000000..f9d6897
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Softw≤are Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import java.util.function.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Helper methods to represent CFMetadata and related objects in CQL format
+ */
+public class ColumnFamilyStoreCQLHelper
+{
+    public static List<String> dumpReCreateStatements(CFMetaData metadata)
+    {
+        List<String> l = new ArrayList<>();
+        // Types come first, as table can't be created without them
+        l.addAll(ColumnFamilyStoreCQLHelper.getUserTypesAsCQL(metadata));
+        // Record re-create schema statements
+        l.add(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(metadata, true));
+        // Dropped columns (and re-additions)
+        l.addAll(ColumnFamilyStoreCQLHelper.getDroppedColumnsAsCQL(metadata));
+        // Indexes applied as last, since otherwise they may interfere with 
column drops / re-additions
+        l.addAll(ColumnFamilyStoreCQLHelper.getIndexesAsCQL(metadata));
+        return l;
+    }
+
+    private static List<ColumnDefinition> getClusteringColumns(CFMetaData 
metadata)
+    {
+        List<ColumnDefinition> cds = new 
ArrayList<>(metadata.clusteringColumns().size());
+
+        if (!metadata.isStaticCompactTable())
+            for (ColumnDefinition cd : metadata.clusteringColumns())
+                cds.add(cd);
+
+        return cds;
+    }
+
+    private static List<ColumnDefinition> getPartitionColumns(CFMetaData 
metadata)
+    {
+        List<ColumnDefinition> cds = new 
ArrayList<>(metadata.partitionColumns().size());
+
+        for (ColumnDefinition cd : metadata.partitionColumns().statics)
+            cds.add(cd);
+
+        if (metadata.isDense())
+        {
+            // remove an empty type
+            for (ColumnDefinition cd : 
metadata.partitionColumns().withoutStatics())
+                if (!cd.type.equals(EmptyType.instance))
+                    cds.add(cd);
+        }
+        // "regular" columns are not exposed for static compact tables
+        else if (!metadata.isStaticCompactTable())
+        {
+            for (ColumnDefinition cd : 
metadata.partitionColumns().withoutStatics())
+                cds.add(cd);
+        }
+
+        return cds;
+    }
+
+    /**
+     * Build a CQL String representation of Column Family Metadata
+     */
+    @VisibleForTesting
+    public static String getCFMetadataAsCQL(CFMetaData metadata, boolean 
includeDroppedColumns)
+    {
+        StringBuilder sb = new StringBuilder();
+        if (!isCqlCompatible(metadata))
+        {
+            sb.append(String.format("/*\nWarning: Table %s.%s omitted because 
it has constructs not compatible with CQL (was created via legacy API).\n",
+                                    metadata.ksName,
+                                    metadata.cfName));
+            sb.append("\nApproximate structure, for reference:");
+            sb.append("\n(this should not be used to reproduce this 
schema)\n\n");
+        }
+
+        sb.append("CREATE TABLE IF NOT EXISTS ");
+        
sb.append(quoteIdentifier(metadata.ksName)).append('.').append(quoteIdentifier(metadata.cfName)).append("
 (");
+
+        List<ColumnDefinition> partitionKeyColumns = 
metadata.partitionKeyColumns();
+        List<ColumnDefinition> clusteringColumns = 
getClusteringColumns(metadata);
+        List<ColumnDefinition> partitionColumns = 
getPartitionColumns(metadata);
+
+        Consumer<StringBuilder> cdCommaAppender = commaAppender("\n\t");
+        sb.append("\n\t");
+        for (ColumnDefinition cfd: partitionKeyColumns)
+        {
+            cdCommaAppender.accept(sb);
+            sb.append(toCQL(cfd));
+            if (partitionKeyColumns.size() == 1 && clusteringColumns.size() == 
0)
+                sb.append(" PRIMARY KEY");
+        }
+
+        for (ColumnDefinition cfd: clusteringColumns)
+        {
+            cdCommaAppender.accept(sb);
+            sb.append(toCQL(cfd));
+        }
+
+        for (ColumnDefinition cfd: partitionColumns)
+        {
+            cdCommaAppender.accept(sb);
+            sb.append(toCQL(cfd, metadata.isStaticCompactTable()));
+        }
+
+        if (includeDroppedColumns)
+        {
+            for (Map.Entry<ByteBuffer, CFMetaData.DroppedColumn> entry: 
metadata.getDroppedColumns().entrySet())
+            {
+                if (metadata.getColumnDefinition(entry.getKey()) != null)
+                    continue;
+
+                CFMetaData.DroppedColumn droppedColumn = entry.getValue();
+                cdCommaAppender.accept(sb);
+                sb.append(quoteIdentifier(droppedColumn.name));
+                sb.append(' ');
+                sb.append(droppedColumn.type.asCQL3Type().toString());
+            }
+        }
+
+        if (clusteringColumns.size() > 0 || partitionKeyColumns.size() > 1)
+        {
+            sb.append(",\n\tPRIMARY KEY (");
+            if (partitionKeyColumns.size() > 1)
+            {
+                sb.append("(");
+                Consumer<StringBuilder> pkCommaAppender = commaAppender(" ");
+                for (ColumnDefinition cfd : partitionKeyColumns)
+                {
+                    pkCommaAppender.accept(sb);
+                    sb.append(quoteIdentifier(cfd.name.toString()));
+                }
+                sb.append(")");
+            }
+            else
+            {
+                
sb.append(quoteIdentifier(partitionKeyColumns.get(0).name.toString()));
+            }
+
+            for (ColumnDefinition cfd : metadata.clusteringColumns())
+                sb.append(", ").append(quoteIdentifier(cfd.name.toString()));
+
+            sb.append(')');
+        }
+        sb.append(")\n\t");
+        sb.append("WITH ");
+
+        sb.append("ID = ").append(metadata.cfId).append("\n\tAND ");
+
+        if (metadata.isCompactTable())
+            sb.append("COMPACT STORAGE\n\tAND ");
+
+        if (clusteringColumns.size() > 0)
+        {
+            sb.append("CLUSTERING ORDER BY (");
+
+            Consumer<StringBuilder> cOrderCommaAppender = commaAppender(" ");
+            for (ColumnDefinition cd : clusteringColumns)
+            {
+                cOrderCommaAppender.accept(sb);
+                sb.append(quoteIdentifier(cd.name.toString())).append(' 
').append(cd.clusteringOrder().toString());
+            }
+            sb.append(")\n\tAND ");
+        }
+
+        sb.append(toCQL(metadata.params));
+        sb.append(";");
+
+        if (!isCqlCompatible(metadata))
+        {
+            sb.append("\n*/");
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Build a CQL String representation of User Types used in the given 
Column Family.
+     *
+     * Type order is ensured as types are built incrementally: from the 
innermost (most nested)
+     * to the outermost.
+     */
+    @VisibleForTesting
+    public static List<String> getUserTypesAsCQL(CFMetaData metadata)
+    {
+        List<UserType> types = new ArrayList<>();
+        Set<UserType> typeSet = new HashSet<>();
+        for (ColumnDefinition cd: 
Iterables.concat(metadata.partitionKeyColumns(), metadata.clusteringColumns(), 
metadata.partitionColumns()))
+        {
+            AbstractType type = cd.type;
+            if (type instanceof UserType)
+                resolveUserType((UserType) type, typeSet, types);
+        }
+
+        List<String> typeStrings = new ArrayList<>();
+        for (UserType type: types)
+            typeStrings.add(toCQL(type));
+        return typeStrings;
+    }
+
+    /**
+     * Build a CQL String representation of Dropped Columns in the given 
Column Family.
+     *
+     * If the column was dropped once, but is now re-created `ADD` will be 
appended accordingly.
+     */
+    @VisibleForTesting
+    public static List<String> getDroppedColumnsAsCQL(CFMetaData metadata)
+    {
+        List<String> droppedColumns = new ArrayList<>();
+
+        for (Map.Entry<ByteBuffer, CFMetaData.DroppedColumn> entry: 
metadata.getDroppedColumns().entrySet())
+        {
+            CFMetaData.DroppedColumn column = entry.getValue();
+            droppedColumns.add(toCQLDrop(metadata.ksName, metadata.cfName, 
column));
+            if (metadata.getColumnDefinition(entry.getKey()) != null)
+                droppedColumns.add(toCQLAdd(metadata.ksName, metadata.cfName, 
metadata.getColumnDefinition(entry.getKey())));
+        }
+
+        return droppedColumns;
+    }
+
+    /**
+     * Build a CQL String representation of Indexes on columns in the given 
Column Family
+     */
+    @VisibleForTesting
+    public static List<String> getIndexesAsCQL(CFMetaData metadata)
+    {
+        List<String> indexes = new ArrayList<>();
+        for (IndexMetadata indexMetadata: metadata.getIndexes())
+            indexes.add(toCQL(metadata.ksName, metadata.cfName, 
indexMetadata));
+        return indexes;
+    }
+
+    private static String toCQL(String keyspace, String cf, IndexMetadata 
indexMetadata)
+    {
+        if (indexMetadata.isCustom())
+        {
+            Map<String, String> options = new HashMap<>();
+            indexMetadata.options.forEach((k, v) -> {
+                if (!k.equals(IndexTarget.TARGET_OPTION_NAME) && 
!k.equals(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
+                    options.put(k, v);
+            });
+
+            return String.format("CREATE CUSTOM INDEX %s ON %s.%s (%s) USING 
'%s'%s;",
+                                 quoteIdentifier(indexMetadata.name),
+                                 quoteIdentifier(keyspace),
+                                 quoteIdentifier(cf),
+                                 
indexMetadata.options.get(IndexTarget.TARGET_OPTION_NAME),
+                                 
indexMetadata.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME),
+                                 options.isEmpty() ? "" : " WITH OPTIONS " + 
toCQL(options));
+        }
+        else
+        {
+            return String.format("CREATE INDEX %s ON %s.%s (%s);",
+                                 quoteIdentifier(indexMetadata.name),
+                                 quoteIdentifier(keyspace),
+                                 quoteIdentifier(cf),
+                                 
indexMetadata.options.get(IndexTarget.TARGET_OPTION_NAME));
+        }
+    }
+    private static String toCQL(UserType userType)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(String.format("CREATE TYPE %s.%s(",
+                                quoteIdentifier(userType.keyspace),
+                                quoteIdentifier(userType.getNameAsString())));
+
+        Consumer<StringBuilder> commaAppender = commaAppender(" ");
+        for (int i = 0; i < userType.size(); i++)
+        {
+            commaAppender.accept(sb);
+            sb.append(String.format("%s %s",
+                                    userType.fieldNameAsString(i),
+                                    userType.fieldType(i).asCQL3Type()));
+        }
+        sb.append(");");
+        return sb.toString();
+    }
+
+    private static String toCQL(TableParams tableParams)
+    {
+        StringBuilder builder = new StringBuilder();
+
+        builder.append("bloom_filter_fp_chance = 
").append(tableParams.bloomFilterFpChance);
+        builder.append("\n\tAND dclocal_read_repair_chance = 
").append(tableParams.dcLocalReadRepairChance);
+        builder.append("\n\tAND crc_check_chance = 
").append(tableParams.crcCheckChance);
+        builder.append("\n\tAND default_time_to_live = 
").append(tableParams.defaultTimeToLive);
+        builder.append("\n\tAND gc_grace_seconds = 
").append(tableParams.gcGraceSeconds);
+        builder.append("\n\tAND min_index_interval = 
").append(tableParams.minIndexInterval);
+        builder.append("\n\tAND max_index_interval = 
").append(tableParams.maxIndexInterval);
+        builder.append("\n\tAND memtable_flush_period_in_ms = 
").append(tableParams.memtableFlushPeriodInMs);
+        builder.append("\n\tAND read_repair_chance = 
").append(tableParams.readRepairChance);
+        builder.append("\n\tAND speculative_retry = 
'").append(tableParams.speculativeRetry).append("'");
+        builder.append("\n\tAND comment = 
").append(singleQuote(tableParams.comment));
+        builder.append("\n\tAND caching = 
").append(toCQL(tableParams.caching.asMap()));
+        builder.append("\n\tAND compaction = 
").append(toCQL(tableParams.compaction.asMap()));
+        builder.append("\n\tAND compression = 
").append(toCQL(tableParams.compression.asMap()));
+
+        builder.append("\n\tAND extensions = { ");
+        for (Map.Entry<String, ByteBuffer> entry : 
tableParams.extensions.entrySet())
+        {
+            builder.append(singleQuote(entry.getKey()));
+            builder.append(": ");
+            builder.append("0x" + ByteBufferUtil.bytesToHex(entry.getValue()));
+        }
+        builder.append(" }");
+        return builder.toString();
+    }
+
+    private static String toCQL(Map<?, ?> map)
+    {
+        StringBuilder builder = new StringBuilder("{ ");
+
+        boolean isFirst = true;
+        for (Map.Entry entry: map.entrySet())
+        {
+            if (isFirst)
+                isFirst = false;
+            else
+                builder.append(", ");
+            builder.append(singleQuote(entry.getKey().toString()));
+            builder.append(": ");
+            builder.append(singleQuote(entry.getValue().toString()));
+        }
+
+        builder.append(" }");
+        return builder.toString();
+    }
+
+    private static String toCQL(ColumnDefinition cd)
+    {
+        return toCQL(cd, false);
+    }
+
+    private static String toCQL(ColumnDefinition cd, boolean 
isStaticCompactTable)
+    {
+        return String.format("%s %s%s",
+                             quoteIdentifier(cd.name.toString()),
+                             cd.type.asCQL3Type().toString(),
+                             cd.isStatic() && !isStaticCompactTable ? " 
static" : "");
+    }
+
+    private static String toCQLAdd(String keyspace, String cf, 
ColumnDefinition cd)
+    {
+        return String.format("ALTER TABLE %s.%s ADD %s %s%s;",
+                             quoteIdentifier(keyspace),
+                             quoteIdentifier(cf),
+                             quoteIdentifier(cd.name.toString()),
+                             cd.type.asCQL3Type().toString(),
+                             cd.isStatic() ? " static" : "");
+    }
+
+    private static String toCQLDrop(String keyspace, String cf, 
CFMetaData.DroppedColumn droppedColumn)
+    {
+        return String.format("ALTER TABLE %s.%s DROP %s USING TIMESTAMP %s;",
+                             quoteIdentifier(keyspace),
+                             quoteIdentifier(cf),
+                             quoteIdentifier(droppedColumn.name),
+                             droppedColumn.droppedTime);
+    }
+
+    private static void resolveUserType(UserType type, Set<UserType> typeSet, 
List<UserType> types)
+    {
+        for (AbstractType subType: type.fieldTypes())
+            if (!typeSet.contains(subType) && subType instanceof UserType)
+                resolveUserType((UserType) subType, typeSet, types);
+
+        if (!typeSet.contains(type))
+        {
+            UserType t = type;
+            typeSet.add(t);
+            types.add(t);
+        }
+    }
+
+    private static String singleQuote(String s)
+    {
+        return String.format("'%s'", s.replaceAll("'", "''"));
+    }
+
+    private static Consumer<StringBuilder> commaAppender(String afterComma)
+    {
+        AtomicBoolean isFirst = new AtomicBoolean(true);
+        return new Consumer<StringBuilder>()
+        {
+            public void accept(StringBuilder stringBuilder)
+            {
+                if (!isFirst.getAndSet(false))
+                    stringBuilder.append(',').append(afterComma);
+            }
+        };
+    }
+
+    private static String quoteIdentifier(String id)
+    {
+        return ColumnIdentifier.maybeQuote(id);
+    }
+
+    /**
+     * Whether or not the given metadata is compatible / representable with 
CQL Language
+     */
+    public static boolean isCqlCompatible(CFMetaData metaData)
+    {
+        if (metaData.isSuper())
+            return false;
+
+        if (metaData.isCompactTable()
+            && metaData.partitionColumns().withoutStatics().size() > 1
+            && metaData.clusteringColumns().size() >= 1)
+            return false;
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 877f984..30fe56d 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -477,6 +477,12 @@ public class Directories
         return new File(snapshotDir, "manifest.json");
     }
 
+    public File getSnapshotSchemaFile(String snapshotName)
+    {
+        File snapshotDir = getSnapshotDirectory(getDirectoryForNewSSTables(), 
snapshotName);
+        return new File(snapshotDir, "schema.cql");
+    }
+
     public File getNewEphemeralSnapshotMarkerFile(String snapshotName)
     {
         File snapshotDir = new File(getWriteableLocationAsFile(1L), 
join(SNAPSHOT_SUBDIR, snapshotName));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java 
b/test/unit/org/apache/cassandra/SchemaLoader.java
index 5d720c4..dd4884c 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -264,6 +264,11 @@ public class SchemaLoader
         MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(name, 
params, Tables.of(tables)), true);
     }
 
+    public static void createKeyspace(String name, KeyspaceParams params, 
Tables tables, Types types)
+    {
+        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(name, 
params, tables, Views.none(), types, Functions.none()), true);
+    }
+
     public static ColumnDefinition integerColumn(String ksName, String cfName)
     {
         return new ColumnDefinition(ksName,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index 509aeac..bcd6587 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -99,6 +99,77 @@ public class AlterTest extends CQLTester
     }
 
     @Test
+    public void testDropWithTimestamp() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, c1 int, v1 int, todrop int, 
PRIMARY KEY (id, c1));");
+        for (int i = 0; i < 5; i++)
+            execute("INSERT INTO %s (id, c1, v1, todrop) VALUES (?, ?, ?, ?) 
USING TIMESTAMP ?", 1, i, i, i, 10000L * i);
+
+        // flush is necessary since otherwise the values of `todrop` will get 
discarded during
+        // alter statement
+        flush();
+        execute("ALTER TABLE %s DROP todrop USING TIMESTAMP 20000;");
+        execute("ALTER TABLE %s ADD todrop int;");
+        execute("INSERT INTO %s (id, c1, v1, todrop) VALUES (?, ?, ?, ?) USING 
TIMESTAMP ?", 1, 100, 100, 100, 30000L);
+        assertRows(execute("SELECT id, c1, v1, todrop FROM %s"),
+                   row(1, 0, 0, null),
+                   row(1, 1, 1, null),
+                   row(1, 2, 2, null),
+                   row(1, 3, 3, 3),
+                   row(1, 4, 4, 4),
+                   row(1, 100, 100, 100));
+    }
+
+    @Test
+    public void testDropStaticWithTimestamp() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, c1 int, v1 int, todrop int 
static, PRIMARY KEY (id, c1));");
+        for (int i = 0; i < 5; i++)
+            execute("INSERT INTO %s (id, c1, v1, todrop) VALUES (?, ?, ?, ?) 
USING TIMESTAMP ?", 1, i, i, i, 10000L * i);
+
+        // flush is necessary since otherwise the values of `todrop` will get 
discarded during
+        // alter statement
+        flush();
+        execute("ALTER TABLE %s DROP todrop USING TIMESTAMP 20000;");
+        execute("ALTER TABLE %s ADD todrop int static;");
+        execute("INSERT INTO %s (id, c1, v1, todrop) VALUES (?, ?, ?, ?) USING 
TIMESTAMP ?", 1, 100, 100, 100, 30000L);
+        // static column value with largest timestmap will be available again
+        assertRows(execute("SELECT id, c1, v1, todrop FROM %s"),
+                   row(1, 0, 0, 4),
+                   row(1, 1, 1, 4),
+                   row(1, 2, 2, 4),
+                   row(1, 3, 3, 4),
+                   row(1, 4, 4, 4),
+                   row(1, 100, 100, 4));
+    }
+
+    @Test
+    public void testDropMultipleWithTimestamp() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, c1 int, v1 int, todrop1 int, 
todrop2 int, PRIMARY KEY (id, c1));");
+        for (int i = 0; i < 5; i++)
+            execute("INSERT INTO %s (id, c1, v1, todrop1, todrop2) VALUES (?, 
?, ?, ?, ?) USING TIMESTAMP ?", 1, i, i, i, i, 10000L * i);
+
+        // flush is necessary since otherwise the values of `todrop1` and 
`todrop2` will get discarded during
+        // alter statement
+        flush();
+        execute("ALTER TABLE %s DROP todrop1 USING TIMESTAMP 20000;");
+        execute("ALTER TABLE %s DROP todrop2 USING TIMESTAMP 20000;");
+        execute("ALTER TABLE %s ADD todrop1 int;");
+        execute("ALTER TABLE %s ADD todrop2 int;");
+
+        execute("INSERT INTO %s (id, c1, v1, todrop1, todrop2) VALUES (?, ?, 
?, ?, ?) USING TIMESTAMP ?", 1, 100, 100, 100, 100, 40000L);
+        assertRows(execute("SELECT id, c1, v1, todrop1, todrop2 FROM %s"),
+                   row(1, 0, 0, null, null),
+                   row(1, 1, 1, null, null),
+                   row(1, 2, 2, null, null),
+                   row(1, 3, 3, 3, 3),
+                   row(1, 4, 4, 4, 4),
+                   row(1, 100, 100, 100, 100));
+    }
+
+
+    @Test
     public void testChangeStrategyWithUnquotedAgrument() throws Throwable
     {
         createTable("CREATE TABLE %s (id text PRIMARY KEY);");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java
new file mode 100644
index 0000000..c2e5cb7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.FileReader;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.*;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.*;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.utils.*;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreCQLHelperTest extends CQLTester
+{
+    @Before
+    public void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Test
+    public void testUserTypesCQL()
+    {
+        String keyspace = "cql_test_keyspace_user_types";
+        String table = "test_table_user_types";
+
+        UserType typeA = new UserType(keyspace, ByteBufferUtil.bytes("a"),
+                                      Arrays.asList(ByteBufferUtil.bytes("a1"),
+                                                    ByteBufferUtil.bytes("a2"),
+                                                    
ByteBufferUtil.bytes("a3")),
+                                      Arrays.asList(IntegerType.instance,
+                                                    IntegerType.instance,
+                                                    IntegerType.instance));
+
+        UserType typeB = new UserType(keyspace, ByteBufferUtil.bytes("b"),
+                                      Arrays.asList(ByteBufferUtil.bytes("b1"),
+                                                    ByteBufferUtil.bytes("b2"),
+                                                    
ByteBufferUtil.bytes("b3")),
+                                      Arrays.asList(typeA,
+                                                    typeA,
+                                                    typeA));
+
+        UserType typeC = new UserType(keyspace, ByteBufferUtil.bytes("c"),
+                                      Arrays.asList(ByteBufferUtil.bytes("c1"),
+                                                    ByteBufferUtil.bytes("c2"),
+                                                    
ByteBufferUtil.bytes("c3")),
+                                      Arrays.asList(typeB,
+                                                    typeB,
+                                                    typeB));
+
+        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
+                                           .addPartitionKey("pk1", 
IntegerType.instance)
+                                           .addClusteringColumn("ck1", 
IntegerType.instance)
+                                           .addRegularColumn("reg1", typeC)
+                                           .addRegularColumn("reg2", 
ListType.getInstance(IntegerType.instance, false))
+                                           .addRegularColumn("reg3", 
MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
+                                           .build();
+
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    Tables.of(cfm),
+                                    Types.of(typeA, typeB, typeC));
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertEquals(ImmutableList.of("CREATE TYPE 
cql_test_keyspace_user_types.a(a1 varint, a2 varint, a3 varint);",
+                                      "CREATE TYPE 
cql_test_keyspace_user_types.b(b1 frozen<a>, b2 frozen<a>, b3 frozen<a>);",
+                                      "CREATE TYPE 
cql_test_keyspace_user_types.c(c1 frozen<b>, c2 frozen<b>, c3 frozen<b>);"),
+                     
ColumnFamilyStoreCQLHelper.getUserTypesAsCQL(cfs.metadata));
+    }
+
+    @Test
+    public void testDroppedColumnsCQL()
+    {
+        String keyspace = "cql_test_keyspace_dropped_columns";
+        String table = "test_table_dropped_columns";
+
+        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
+                                           .addPartitionKey("pk1", 
IntegerType.instance)
+                                           .addClusteringColumn("ck1", 
IntegerType.instance)
+                                           .addRegularColumn("reg1", 
IntegerType.instance)
+                                           .addRegularColumn("reg2", 
IntegerType.instance)
+                                           .addRegularColumn("reg3", 
IntegerType.instance)
+                                           .build();
+
+
+        ColumnDefinition reg1 = 
cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1"));
+        ColumnDefinition reg2 = 
cfm.getColumnDefinition(ByteBufferUtil.bytes("reg2"));
+        ColumnDefinition reg3 = 
cfm.getColumnDefinition(ByteBufferUtil.bytes("reg3"));
+
+        cfm.removeColumnDefinition(reg1);
+        cfm.removeColumnDefinition(reg2);
+        cfm.removeColumnDefinition(reg3);
+
+        cfm.recordColumnDrop(reg1, 10000);
+        cfm.recordColumnDrop(reg2, 20000);
+        cfm.recordColumnDrop(reg3, 30000);
+
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertEquals(ImmutableList.of("ALTER TABLE 
cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg1 USING 
TIMESTAMP 10000;",
+                                      "ALTER TABLE 
cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg3 USING 
TIMESTAMP 30000;",
+                                      "ALTER TABLE 
cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg2 USING 
TIMESTAMP 20000;"),
+                     
ColumnFamilyStoreCQLHelper.getDroppedColumnsAsCQL(cfs.metadata));
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS 
cql_test_keyspace_dropped_columns.test_table_dropped_columns (\n" +
+        "\tpk1 varint,\n" +
+        "\tck1 varint,\n" +
+        "\treg1 varint,\n" +
+        "\treg3 varint,\n" +
+        "\treg2 varint,\n" +
+        "\tPRIMARY KEY (pk1, ck1))"));
+    }
+
+    @Test
+    public void testReaddedColumns()
+    {
+        String keyspace = "cql_test_keyspace_readded_columns";
+        String table = "test_table_readded_columns";
+
+        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
+                                           .addPartitionKey("pk1", 
IntegerType.instance)
+                                           .addClusteringColumn("ck1", 
IntegerType.instance)
+                                           .addRegularColumn("reg1", 
IntegerType.instance)
+                                           .addStaticColumn("reg2", 
IntegerType.instance)
+                                           .addRegularColumn("reg3", 
IntegerType.instance)
+                                           .build();
+
+        ColumnDefinition reg1 = 
cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1"));
+        ColumnDefinition reg2 = 
cfm.getColumnDefinition(ByteBufferUtil.bytes("reg2"));
+
+        cfm.removeColumnDefinition(reg1);
+        cfm.removeColumnDefinition(reg2);
+
+        cfm.recordColumnDrop(reg1, 10000);
+        cfm.recordColumnDrop(reg2, 20000);
+
+        cfm.addColumnDefinition(reg1);
+        cfm.addColumnDefinition(reg2);
+
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        // when re-adding, column is present in CREATE, then in DROP and then 
in ADD again, to record DROP with a proper timestamp
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS 
cql_test_keyspace_readded_columns.test_table_readded_columns (\n" +
+        "\tpk1 varint,\n" +
+        "\tck1 varint,\n" +
+        "\treg2 varint static,\n" +
+        "\treg1 varint,\n" +
+        "\treg3 varint,\n" +
+        "\tPRIMARY KEY (pk1, ck1))"));
+
+        assertEquals(ImmutableList.of("ALTER TABLE 
cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg1 USING 
TIMESTAMP 10000;",
+                                      "ALTER TABLE 
cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg1 varint;",
+                                      "ALTER TABLE 
cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg2 USING 
TIMESTAMP 20000;",
+                                      "ALTER TABLE 
cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg2 varint 
static;"),
+                     
ColumnFamilyStoreCQLHelper.getDroppedColumnsAsCQL(cfs.metadata));
+    }
+
+    @Test
+    public void testCfmColumnsCQL()
+    {
+        String keyspace = "cql_test_keyspace_create_table";
+        String table = "test_table_create_table";
+
+        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
+                                           .addPartitionKey("pk1", 
IntegerType.instance)
+                                           .addPartitionKey("pk2", 
AsciiType.instance)
+                                           .addClusteringColumn("ck1", 
ReversedType.getInstance(IntegerType.instance))
+                                           .addClusteringColumn("ck2", 
IntegerType.instance)
+                                           .addStaticColumn("st1", 
AsciiType.instance)
+                                           .addRegularColumn("reg1", 
AsciiType.instance)
+                                           .addRegularColumn("reg2", 
ListType.getInstance(IntegerType.instance, false))
+                                           .addRegularColumn("reg3", 
MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
+                                           .build();
+
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS 
cql_test_keyspace_create_table.test_table_create_table (\n" +
+        "\tpk1 varint,\n" +
+        "\tpk2 ascii,\n" +
+        "\tck1 varint,\n" +
+        "\tck2 varint,\n" +
+        "\tst1 ascii static,\n" +
+        "\treg1 ascii,\n" +
+        "\treg2 frozen<list<varint>>,\n" +
+        "\treg3 map<ascii, varint>,\n" +
+        "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
+        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
+    }
+
+    @Test
+    public void testCfmCompactStorageCQL()
+    {
+        String keyspace = "cql_test_keyspace_compact";
+        String table = "test_table_compact";
+
+        CFMetaData cfm = CFMetaData.Builder.createDense(keyspace, table, true, 
false)
+                                           .addPartitionKey("pk1", 
IntegerType.instance)
+                                           .addPartitionKey("pk2", 
AsciiType.instance)
+                                           .addClusteringColumn("ck1", 
ReversedType.getInstance(IntegerType.instance))
+                                           .addClusteringColumn("ck2", 
IntegerType.instance)
+                                           .addRegularColumn("reg", 
IntegerType.instance)
+                                           .build();
+
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS 
cql_test_keyspace_compact.test_table_compact (\n" +
+        "\tpk1 varint,\n" +
+        "\tpk2 ascii,\n" +
+        "\tck1 varint,\n" +
+        "\tck2 varint,\n" +
+        "\treg varint,\n" +
+        "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
+        "\tWITH ID = " + cfm.cfId + "\n" +
+        "\tAND COMPACT STORAGE\n" +
+        "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
+    }
+
+    @Test
+    public void testCfmCounterCQL()
+    {
+        String keyspace = "cql_test_keyspace_counter";
+        String table = "test_table_counter";
+
+        CFMetaData cfm = CFMetaData.Builder.createDense(keyspace, table, true, 
true)
+                                           .addPartitionKey("pk1", 
IntegerType.instance)
+                                           .addPartitionKey("pk2", 
AsciiType.instance)
+                                           .addClusteringColumn("ck1", 
ReversedType.getInstance(IntegerType.instance))
+                                           .addClusteringColumn("ck2", 
IntegerType.instance)
+                                           .addRegularColumn("cnt", 
CounterColumnType.instance)
+                                           .build();
+
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS 
cql_test_keyspace_counter.test_table_counter (\n" +
+        "\tpk1 varint,\n" +
+        "\tpk2 ascii,\n" +
+        "\tck1 varint,\n" +
+        "\tck2 varint,\n" +
+        "\tcnt counter,\n" +
+        "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
+        "\tWITH ID = " + cfm.cfId + "\n" +
+        "\tAND COMPACT STORAGE\n" +
+        "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
+    }
+
+    @Test
+    public void testCfmOptionsCQL()
+    {
+        String keyspace = "cql_test_keyspace_options";
+        String table = "test_table_options";
+
+        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
+                                           .addPartitionKey("pk1", 
IntegerType.instance)
+                                           .addClusteringColumn("cl1", 
IntegerType.instance)
+                                           .addRegularColumn("reg1", 
AsciiType.instance)
+                                           .build();
+
+        
cfm.recordColumnDrop(cfm.getColumnDefinition(ByteBuffer.wrap("reg1".getBytes())),
 FBUtilities.timestampMicros());
+        cfm.bloomFilterFpChance(1.0);
+        cfm.comment("comment");
+        
cfm.compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb",
 "1")));
+        cfm.compression(CompressionParams.lz4(1 << 16));
+        cfm.dcLocalReadRepairChance(0.2);
+        cfm.crcCheckChance(0.3);
+        cfm.defaultTimeToLive(4);
+        cfm.gcGraceSeconds(5);
+        cfm.minIndexInterval(6);
+        cfm.maxIndexInterval(7);
+        cfm.memtableFlushPeriod(8);
+        cfm.readRepairChance(0.9);
+        cfm.speculativeRetry(SpeculativeRetryParam.always());
+        cfm.extensions(ImmutableMap.of("ext1",
+                                       ByteBuffer.wrap("val1".getBytes())));
+
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).endsWith(
+        "AND bloom_filter_fp_chance = 1.0\n" +
+        "\tAND dclocal_read_repair_chance = 0.2\n" +
+        "\tAND crc_check_chance = 0.3\n" +
+        "\tAND default_time_to_live = 4\n" +
+        "\tAND gc_grace_seconds = 5\n" +
+        "\tAND min_index_interval = 6\n" +
+        "\tAND max_index_interval = 7\n" +
+        "\tAND memtable_flush_period_in_ms = 8\n" +
+        "\tAND read_repair_chance = 0.9\n" +
+        "\tAND speculative_retry = 'ALWAYS'\n" +
+        "\tAND comment = 'comment'\n" +
+        "\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" +
+        "\tAND compaction = { 'class': 
'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 
'sstable_size_in_mb': '1' }\n" +
+        "\tAND compression = { 'chunk_length_in_kb': '64', 'class': 
'org.apache.cassandra.io.compress.LZ4Compressor' }\n" +
+        "\tAND extensions = { 'ext1': 0x76616c31 };"
+        ));
+    }
+
+    @Test
+    public void testCfmIndexJson()
+    {
+        String keyspace = "cql_test_keyspace_3";
+        String table = "test_table_3";
+
+        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
+                                           .addPartitionKey("pk1", 
IntegerType.instance)
+                                           .addClusteringColumn("cl1", 
IntegerType.instance)
+                                           .addRegularColumn("reg1", 
AsciiType.instance)
+                                           .build();
+
+        cfm.indexes(cfm.getIndexes()
+                       .with(IndexMetadata.fromIndexTargets(cfm,
+                                                            
Collections.singletonList(new 
IndexTarget(cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1")).name,
+                                                                               
                       IndexTarget.Type.VALUES)),
+                                                            "indexName",
+                                                            
IndexMetadata.Kind.COMPOSITES,
+                                                            
Collections.emptyMap()))
+                       .with(IndexMetadata.fromIndexTargets(cfm,
+                                                            
Collections.singletonList(new 
IndexTarget(cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1")).name,
+                                                                               
                       IndexTarget.Type.KEYS)),
+                                                            "indexName2",
+                                                            
IndexMetadata.Kind.COMPOSITES,
+                                                            
Collections.emptyMap()))
+                       .with(IndexMetadata.fromIndexTargets(cfm,
+                                                            
Collections.singletonList(new 
IndexTarget(cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1")).name,
+                                                                               
                       IndexTarget.Type.KEYS_AND_VALUES)),
+                                                            "indexName3",
+                                                            
IndexMetadata.Kind.COMPOSITES,
+                                                            
Collections.emptyMap())));
+
+
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertEquals(ImmutableList.of("CREATE INDEX \"indexName\" ON 
cql_test_keyspace_3.test_table_3 (reg1);",
+                                      "CREATE INDEX \"indexName2\" ON 
cql_test_keyspace_3.test_table_3 (reg1);",
+                                      "CREATE INDEX \"indexName3\" ON 
cql_test_keyspace_3.test_table_3 (reg1);"),
+                     ColumnFamilyStoreCQLHelper.getIndexesAsCQL(cfs.metadata));
+    }
+
+    private final static String SNAPSHOT = "testsnapshot";
+
+    @Test
+    public void testSnapshot() throws Throwable
+    {
+        String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 
varint);");
+        String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, 
b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);");
+        String typeC = createType("CREATE TYPE %s (c1 frozen<" + typeB + ">, 
c2 frozen<" + typeB + ">, c3 frozen<" + typeB + ">);");
+
+        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
+                                       "pk1 varint," +
+                                       "pk2 ascii," +
+                                       "ck1 varint," +
+                                       "ck2 varint," +
+                                       "reg1 frozen<" + typeC + ">," +
+                                       "reg2 int," +
+                                       "reg3 int," +
+                                       "PRIMARY KEY ((pk1, pk2), ck1, ck2)) 
WITH " +
+                                       "CLUSTERING ORDER BY (ck1 ASC, ck2 
DESC);");
+
+        alterTable("ALTER TABLE %s DROP reg3 USING TIMESTAMP 10000;");
+        alterTable("ALTER TABLE %s ADD reg3 int;");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk1, pk2, ck1, ck2, reg1, reg2) VALUES 
(?, ?, ?, ?, ?, ?)", i, i + 1, i + 2, i + 3, null, i + 5);
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
+        cfs.snapshot(SNAPSHOT);
+
+        String schema = 
Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), 
Charset.defaultCharset());
+        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s(a1 varint, 
a2 varint, a3 varint);", keyspace(), typeA)));
+        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s(a1 varint, 
a2 varint, a3 varint);", keyspace(), typeA)));
+        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s(b1 
frozen<%s>, b2 frozen<%s>, b3 frozen<%s>);", keyspace(), typeB, typeA, typeA, 
typeA)));
+        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s(c1 
frozen<%s>, c2 frozen<%s>, c3 frozen<%s>);", keyspace(), typeC, typeB, typeB, 
typeB)));
+
+        schema = schema.substring(schema.indexOf("CREATE TABLE")); // trim to 
ensure order
+
+        assertTrue(schema.startsWith("CREATE TABLE IF NOT EXISTS " + 
keyspace() + "." + tableName + " (\n" +
+                                     "\tpk1 varint,\n" +
+                                     "\tpk2 ascii,\n" +
+                                     "\tck1 varint,\n" +
+                                     "\tck2 varint,\n" +
+                                     "\treg1 frozen<" + typeC + ">,\n" +
+                                     "\treg2 int,\n" +
+                                     "\treg3 int,\n" +
+                                     "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" 
+
+                                     "\tWITH ID = " + cfs.metadata.cfId + "\n" 
+
+                                     "\tAND CLUSTERING ORDER BY (ck1 ASC, ck2 
DESC)"));
+
+        schema = schema.substring(schema.indexOf("ALTER"));
+        assertTrue(schema.startsWith(String.format("ALTER TABLE %s.%s DROP 
reg3 USING TIMESTAMP 10000;", keyspace(), tableName)));
+        assertTrue(schema.contains(String.format("ALTER TABLE %s.%s ADD reg3 
int;", keyspace(), tableName)));
+
+        JSONObject manifest = (JSONObject) new JSONParser().parse(new 
FileReader(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT)));
+        JSONArray files = (JSONArray) manifest.get("files");
+        Assert.assertEquals(1, files.size());
+    }
+
+    @Test
+    public void testSystemKsSnapshot() throws Throwable
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open("system").getColumnFamilyStore("peers");
+        cfs.snapshot(SNAPSHOT);
+
+        
Assert.assertTrue(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT).exists());
+        
Assert.assertFalse(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT).exists());
+    }
+
+    @Test
+    public void testDroppedType() throws Throwable
+    {
+        String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 
varint);");
+        String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, 
b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);");
+
+        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
+                                       "pk1 varint," +
+                                       "ck1 varint," +
+                                       "reg1 frozen<" + typeB + ">," +
+                                       "reg2 varint," +
+                                       "PRIMARY KEY (pk1, ck1));");
+
+        alterTable("ALTER TABLE %s DROP reg1 USING TIMESTAMP 10000;");
+
+        Runnable validate = () -> {
+            try
+            {
+                ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
+                cfs.snapshot(SNAPSHOT);
+                String schema = 
Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), 
Charset.defaultCharset());
+
+                // When both column and it's type are dropped, the type in 
column definition gets substituted with a tuple
+                assertTrue(schema.startsWith("CREATE TABLE IF NOT EXISTS " + 
keyspace() + "." + tableName + " (\n" +
+                                             "\tpk1 varint,\n" +
+                                             "\tck1 varint,\n" +
+                                             "\treg2 varint,\n" +
+                                             "\treg1 
frozen<tuple<frozen<tuple<varint, varint, varint>>, frozen<tuple<varint, 
varint, varint>>, frozen<tuple<varint, varint, varint>>>>,\n" +
+                                             "\tPRIMARY KEY (pk1, ck1))"));
+                assertTrue(schema.contains("ALTER TABLE " + keyspace() + "." + 
tableName + " DROP reg1 USING TIMESTAMP 10000;"));
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        // Validate before and after the type drop
+        validate.run();
+        schemaChange("DROP TYPE " + keyspace() + "." + typeB);
+        schemaChange("DROP TYPE " + keyspace() + "." + typeA);
+        validate.run();
+    }
+
+    @Test
+    public void testDenseTable() throws Throwable
+    {
+        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
+                                       "pk1 varint PRIMARY KEY," +
+                                       "reg1 int)" +
+                                       " WITH COMPACT STORAGE");
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
+        "\tpk1 varint PRIMARY KEY,\n" +
+        "\treg1 int)\n" +
+        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tAND COMPACT STORAGE"));
+    }
+
+    @Test
+    public void testStaticCompactTable() throws Throwable
+    {
+        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
+                                       "pk1 varint PRIMARY KEY," +
+                                       "reg1 int," +
+                                       "reg2 int)" +
+                                       " WITH COMPACT STORAGE");
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
+        "\tpk1 varint PRIMARY KEY,\n" +
+        "\treg1 int,\n" +
+        "\treg2 int)\n" +
+        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tAND COMPACT STORAGE"));
+    }
+
+    @Test
+    public void testStaticCompactWithCounters() throws Throwable
+    {
+        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
+                                       "pk1 varint PRIMARY KEY," +
+                                       "reg1 counter," +
+                                       "reg2 counter)" +
+                                       " WITH COMPACT STORAGE");
+
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
+        "\tpk1 varint PRIMARY KEY,\n" +
+        "\treg1 counter,\n" +
+        "\treg2 counter)\n" +
+        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tAND COMPACT STORAGE"));
+    }
+
+    @Test
+    public void testDenseCompactTableWithoutRegulars() throws Throwable
+    {
+        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
+                                       "pk1 varint," +
+                                       "ck1 int," +
+                                       "PRIMARY KEY (pk1, ck1))" +
+                                       " WITH COMPACT STORAGE");
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
+        "\tpk1 varint,\n" +
+        "\tck1 int,\n" +
+        "\tPRIMARY KEY (pk1, ck1))\n" +
+        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tAND COMPACT STORAGE"));
+    }
+
+    @Test
+    public void testCompactDynamic() throws Throwable
+    {
+        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
+                                       "pk1 varint," +
+                                       "ck1 int," +
+                                       "reg int," +
+                                       "PRIMARY KEY (pk1, ck1))" +
+                                       " WITH COMPACT STORAGE");
+
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
+        "\tpk1 varint,\n" +
+        "\tck1 int,\n" +
+        "\treg int,\n" +
+        "\tPRIMARY KEY (pk1, ck1))\n" +
+        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tAND COMPACT STORAGE"));
+    }
+
+    @Test
+    public void testDynamicComposite() throws Throwable
+    {
+        Map<Byte, AbstractType<?>> aliases = new HashMap<>();
+        aliases.put((byte)'a', BytesType.instance);
+        aliases.put((byte)'b', BytesType.instance);
+        aliases.put((byte)'c', BytesType.instance);
+
+        String DYNAMIC_COMPOSITE = "dynamic_composite";
+        AbstractType<?> dynamicComposite = 
DynamicCompositeType.getInstance(aliases);
+
+        SchemaLoader.createKeyspace(DYNAMIC_COMPOSITE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.denseCFMD(DYNAMIC_COMPOSITE, 
DYNAMIC_COMPOSITE, dynamicComposite));
+
+        ColumnFamilyStore cfs = 
Keyspace.open(DYNAMIC_COMPOSITE).getColumnFamilyStore(DYNAMIC_COMPOSITE);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "CREATE TABLE IF NOT EXISTS " + DYNAMIC_COMPOSITE + "." + 
DYNAMIC_COMPOSITE + " (\n" +
+        "\tkey ascii,\n" +
+        "\tcols 
'org.apache.cassandra.db.marshal.DynamicCompositeType(a=>org.apache.cassandra.db.marshal.BytesType,b=>org.apache.cassandra.db.marshal.BytesType,c=>org.apache.cassandra.db.marshal.BytesType)',\n"
 +
+        "\tval ascii,\n" +
+        "\tPRIMARY KEY (key, cols))\n" +
+        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tAND COMPACT STORAGE"));
+    }
+
+    @Test
+    public void superColumnFamilyTest() throws Throwable
+    {
+        final String KEYSPACE = "thrift_compact_table_with_supercolumns_test";
+        final String TABLE = "test_table_1";
+
+        CFMetaData cfm = CFMetaData.Builder.createSuper(KEYSPACE, TABLE, false)
+                                           .addPartitionKey("pk", 
BytesType.instance)
+                                           .addClusteringColumn("c1", 
AsciiType.instance)
+                                           .addClusteringColumn("c2", 
AsciiType.instance)
+                                           .addRegularColumn("", 
MapType.getInstance(Int32Type.instance, AsciiType.instance, true))
+                                           .build();
+
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+
+        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, 
true).startsWith(
+        "/*\n" +
+        "Warning: Table " + KEYSPACE + "." + TABLE + " omitted because it has 
constructs not compatible with CQL (was created via legacy API).\n\n" +
+        "Approximate structure, for reference:\n" +
+        "(this should not be used to reproduce this schema)\n\n" +
+        "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE + " (\n" +
+        "\tpk blob,\n" +
+        "\tc1 ascii,\n" +
+        "\tc2 ascii,\n" +
+        "\t\"\" map<int, ascii>,\n" +
+        "\tPRIMARY KEY (pk, c1, c2))\n" +
+        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tAND COMPACT STORAGE"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6528fbf2/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java 
b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index feb2778..3676ca3 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.cassandra.utils.*;
 
 import static java.lang.String.format;
 import static junit.framework.Assert.assertEquals;
@@ -284,7 +285,7 @@ public class LegacySchemaMigratorTest
         for (String name : collectionColumnNames)
         {
             ColumnDefinition column = table.getColumnDefinition(bytes(name));
-            table.recordColumnDrop(column);
+            table.recordColumnDrop(column, FBUtilities.timestampMicros());
             table.removeColumnDefinition(column);
         }
 

Reply via email to