This is an automated email from the ASF dual-hosted git repository.

edimitrova pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c7432e98a381c16cb63692e727c35e2909d7200a
Merge: 0e990d7 b654000
Author: Ekaterina Dimitrova <[email protected]>
AuthorDate: Wed Mar 10 20:54:07 2021 -0500

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 build.xml                                          |   2 +-
 .../statements/schema/AlterTableStatement.java     |  93 ++++++-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   4 +
 .../db/compaction/CompactionStrategyManager.java   |   5 +-
 .../org/apache/cassandra/db/lifecycle/Tracker.java |  61 +++--
 .../org/apache/cassandra/gms/ApplicationState.java |  17 ++
 .../cassandra/gms/GossipDigestAckVerbHandler.java  |   2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    |  42 ++-
 .../org/apache/cassandra/gms/VersionedValue.java   |  12 +-
 .../apache/cassandra/io/sstable/Descriptor.java    |   7 +
 .../cassandra/io/sstable/format/Version.java       |   1 -
 .../io/sstable/format/VersionAndType.java          |  93 +++++++
 .../InitialSSTableAddedNotification.java}          |  27 +-
 src/java/org/apache/cassandra/schema/Schema.java   |   3 +-
 .../cassandra/schema/SchemaTransformation.java     |   4 +-
 .../cassandra/service/SSTablesGlobalTracker.java   | 284 +++++++++++++++++++++
 .../SSTablesVersionsInUseChangeNotification.java   |  50 ++++
 .../apache/cassandra/service/StartupChecks.java    |   5 +-
 .../apache/cassandra/service/StorageService.java   |  19 ++
 .../upgrade/CompactStorage3to4UpgradeTest.java     |   4 +
 .../validation/operations/CompactTableTest.java    |   1 -
 .../apache/cassandra/db/lifecycle/TrackerTest.java |  31 ++-
 .../io/sstable/format/VersionAndTypeTest.java      |  51 ++++
 .../service/SSTablesGlobalTrackerTest.java         | 158 ++++++++++++
 25 files changed, 899 insertions(+), 78 deletions(-)

diff --cc CHANGES.txt
index ffec3cb,1e74bf6..592c9c3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,50 -1,10 +1,51 @@@
 -3.11.11
 +4.0-beta5
 + * Send FAILED_SESSION_MSG on shutdown and on in-progress repairs during 
startup (CASSANDRA-16425)
 + * Reinstate removed ApplicationState padding (CASSANDRA-16484)
 + * Expose data dirs to ColumnFamilyStoreMBean (CASSANDRA-16335)
 + * Add possibility to copy SSTables in SSTableImporter instead of moving them 
(CASSANDRA-16407)
 + * Fix DESCRIBE statement for CUSTOM indices with options (CASSANDRA-16482)
 + * Fix cassandra-stress JMX connection (CASSANDRA-16473)
 + * Avoid processing redundant application states on endpoint changes 
(CASSANDRA-16381)
 + * Prevent parent repair sessions leak (CASSANDRA-16446)
 + * Fix timestamp issue in SinglePartitionSliceCommandTest 
testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
 + * Promote protocol V5 out of beta (CASSANDRA-14973)
 + * Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429)
 + * Fix node unable to join when RF > N in multi-DC with added warning 
(CASSANDRA-16296)
 + * Add an option to nodetool tablestats to check sstable location correctness 
(CASSANDRA-16344) 
 + * Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in 
gossip (CASSANDRA-16422)
 + * Metrics backward compatibility restored after CASSANDRA-15066 
(CASSANDRA-16083)
 + * Reduce new reserved keywords introduced since 3.0 (CASSANDRA-16439)
 + * Improve system tables handling in case of disk failures (CASSANDRA-14793)
 + * Add access and datacenters to unreserved keywords (CASSANDRA-16398)
 + * Fix nodetool ring, status output when DNS resolution or port printing are 
in use (CASSANDRA-16283)
 + * Upgrade Jacoco to 0.8.6 (for Java 11 support) (CASSANDRA-16365)
 + * Move excessive repair debug loggings to trace level (CASSANDRA-16406)
 + * Restore validation of each message's protocol version (CASSANDRA-16374)
 + * Upgrade netty and chronicle-queue dependencies to get Auditing and native 
library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392)
 + * Release StreamingTombstoneHistogramBuilder spool when switching writers 
(CASSANDRA-14834)
 + * Correct memtable on-heap size calculations to match actual use 
(CASSANDRA-16318)
 + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
 + * Too defensive check when picking sstables for preview repair 
(CASSANDRA-16284)
 + * Ensure pre-negotiation native protocol responses have correct stream id 
(CASSANDRA-16376)
 + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
 + * SSLFactory should initialize SSLContext before setting protocols 
(CASSANDRA-16362)
 + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, 
in the cassandra-all pom (CASSANDRA-16303)
 + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
 +Merged from 3.11:
   * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
 + * Fix digest computation for queries with fetched but non queried columns 
(CASSANDRA-15962)
 + * Reduce amount of allocations during batch statement execution 
(CASSANDRA-16201)
 + * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
  Merged from 3.0:
+  * Refuse DROP COMPACT STORAGE if some 2.x sstables are in use 
(CASSANDRA-15897)
   * Fix ColumnFilter::toString not returning a valid CQL fragment 
(CASSANDRA-16483)
   * Fix ColumnFilter behaviour to prevent digest mitmatches during upgrades 
(CASSANDRA-16415)
 + * Update debian packaging for python3 (CASSANDRA-16396)
   * Avoid pushing schema mutations when setting up distributed system 
keyspaces locally (CASSANDRA-16387)
 + * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261)
 + * Improve empty hint file handling during startup (CASSANDRA-16162)
 + * Fix skipping on pre-3.0 created compact storage sstables due to missing 
primary key liveness (CASSANDRA-16226)
 + * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
  Merged from 2.2:
   * Fix centos packaging for arm64, >=4.0 rpm's now require python3 
(CASSANDRA-16477)
   * Make TokenMetadata's ring version increments atomic (CASSANDRA-16286)
diff --cc build.xml
index 36db9e5,5726fed..ecb1ad8
--- a/build.xml
+++ b/build.xml
@@@ -586,14 -412,21 +586,15 @@@
            <dependency groupId="com.fasterxml.jackson.core" 
artifactId="jackson-annotations" version="2.9.10"/>
            <dependency groupId="com.googlecode.json-simple" 
artifactId="json-simple" version="1.1"/>
            <dependency groupId="com.boundary" artifactId="high-scale-lib" 
version="1.0.6"/>
 -          <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="0.3.0"/>
 -
 -          <dependency groupId="com.thinkaurelius.thrift" 
artifactId="thrift-server" version="0.3.7">
 -            <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/>
 -            <exclusion groupId="junit" artifactId="junit"/>
 -          </dependency>
 -          <dependency groupId="org.yaml" artifactId="snakeyaml" 
version="1.11"/>
 -          <dependency groupId="org.apache.thrift" artifactId="libthrift" 
version="0.9.2">
 -               <exclusion groupId="commons-logging" 
artifactId="commons-logging"/>
 -          </dependency>
 +          <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="${jamm.version}"/>
 +          <dependency groupId="org.yaml" artifactId="snakeyaml" 
version="1.26"/>
            <dependency groupId="junit" artifactId="junit" version="4.12" />
            <dependency groupId="org.mockito" artifactId="mockito-core" 
version="3.2.4" />
 +          <dependency groupId="org.quicktheories" artifactId="quicktheories" 
version="0.26" />
 +          <dependency groupId="com.google.code.java-allocation-instrumenter" 
artifactId="java-allocation-instrumenter" 
version="${allocation-instrumenter.version}" />
            <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.7" />
            <dependency groupId="org.reflections" artifactId="reflections" 
version="0.9.12" />
+           <dependency groupId="org.quicktheories" artifactId="quicktheories" 
version="0.25" />
            <dependency groupId="org.apache.rat" artifactId="apache-rat" 
version="0.10">
               <exclusion groupId="commons-lang" artifactId="commons-lang"/>
            </dependency>
@@@ -667,22 -484,11 +668,21 @@@
            <dependency groupId="de.jflex" artifactId="jflex" version="1.6.0" />
            <dependency groupId="com.github.rholder" 
artifactId="snowball-stemmer" version="1.3.0.581.1" />
            <dependency groupId="com.googlecode.concurrent-trees" 
artifactId="concurrent-trees" version="2.4.0" />
 -          <dependency groupId="com.github.ben-manes.caffeine" 
artifactId="caffeine" version="2.2.6" />
 -          <dependency groupId="org.jctools" artifactId="jctools-core" 
version="1.2.1"/>
 -          <dependency groupId="org.ow2.asm" artifactId="asm" version="5.0.4" 
/>
 +          <dependency groupId="com.github.ben-manes.caffeine" 
artifactId="caffeine" version="2.3.5" />
 +          <dependency groupId="org.jctools" artifactId="jctools-core" 
version="3.1.0"/>
 +          <dependency groupId="org.ow2.asm" artifactId="asm" 
version="${asm.version}" />
 +          <dependency groupId="org.ow2.asm" artifactId="asm-tree" 
version="${asm.version}" />
 +          <dependency groupId="org.ow2.asm" artifactId="asm-commons" 
version="${asm.version}" />
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-cli" 
version="0.14"/>
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-core" 
version="0.14"/>
 +          <dependency groupId="org.gridkit.jvmtool" 
artifactId="sjk-stacktrace" version="0.14"/>
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="mxdump" 
version="0.14"/>
 +          <dependency groupId="org.gridkit.lab" artifactId="jvm-attach-api" 
version="1.5"/>
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-json" 
version="0.14"/>
 +          <dependency groupId="com.beust" artifactId="jcommander" 
version="1.30"/>
            <!-- when updating assertj, make sure to also update the 
corresponding junit-bom dependency -->
            <dependency groupId="org.assertj" artifactId="assertj-core" 
version="3.15.0"/>
 +          <dependency groupId="org.awaitility" artifactId="awaitility" 
version="4.0.3" />
- 
          </dependencyManagement>
          <developer id="adelapena" name="Andres de la Peña"/>
          <developer id="alakshman" name="Avinash Lakshman"/>
diff --cc 
src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
index b6cab44,0000000..0820073
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@@ -1,506 -1,0 +1,593 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.statements.schema;
 +
++import java.net.UnknownHostException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
++import java.util.concurrent.TimeUnit;
 +
++import com.google.common.base.Splitter;
 +import com.google.common.collect.ImmutableSet;
 +
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
 +import org.apache.cassandra.audit.AuditLogContext;
 +import org.apache.cassandra.audit.AuditLogEntryType;
 +import org.apache.cassandra.auth.Permission;
 +
 +import org.apache.cassandra.cql3.CQL3Type;
 +import org.apache.cassandra.cql3.CQLStatement;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.QualifiedName;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +
++import org.apache.cassandra.exceptions.InvalidRequestException;
++import org.apache.cassandra.gms.ApplicationState;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
 +import org.apache.cassandra.schema.Keyspaces;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableParams;
 +import org.apache.cassandra.schema.ViewMetadata;
 +import org.apache.cassandra.schema.Views;
 +import org.apache.cassandra.service.ClientState;
++import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 +import org.apache.cassandra.transport.Event.SchemaChange;
 +import org.apache.cassandra.transport.Event.SchemaChange.Change;
 +import org.apache.cassandra.transport.Event.SchemaChange.Target;
++import org.apache.cassandra.utils.NoSpamLogger;
 +
++import static java.lang.String.format;
 +import static java.lang.String.join;
 +
 +import static com.google.common.collect.Iterables.isEmpty;
 +import static com.google.common.collect.Iterables.transform;
 +
 +public abstract class AlterTableStatement extends AlterSchemaStatement
 +{
 +    protected final String tableName;
 +
 +    public AlterTableStatement(String keyspaceName, String tableName)
 +    {
 +        super(keyspaceName);
 +        this.tableName = tableName;
 +    }
 +
-     public Keyspaces apply(Keyspaces schema)
++    public Keyspaces apply(Keyspaces schema) throws UnknownHostException
 +    {
 +        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
 +
 +        TableMetadata table = null == keyspace
 +                            ? null
 +                            : keyspace.getTableOrViewNullable(tableName);
 +
 +        if (null == table)
 +            throw ire("Table '%s.%s' doesn't exist", keyspaceName, tableName);
 +
 +        if (table.isView())
 +            throw ire("Cannot use ALTER TABLE on a materialized view; use 
ALTER MATERIALIZED VIEW instead");
 +
 +        return schema.withAddedOrUpdated(apply(keyspace, table));
 +    }
 +
 +    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
 +    {
 +        return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, 
tableName);
 +    }
 +
 +    public void authorize(ClientState client)
 +    {
 +        client.ensureTablePermission(keyspaceName, tableName, 
Permission.ALTER);
 +    }
 +
 +    @Override
 +    public AuditLogContext getAuditLogContext()
 +    {
 +        return new AuditLogContext(AuditLogEntryType.ALTER_TABLE, 
keyspaceName, tableName);
 +    }
 +
 +    public String toString()
 +    {
-         return String.format("%s (%s, %s)", getClass().getSimpleName(), 
keyspaceName, tableName);
++        return format("%s (%s, %s)", getClass().getSimpleName(), 
keyspaceName, tableName);
 +    }
 +
-     abstract KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata 
table);
++    abstract KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata 
table) throws UnknownHostException;
 +
 +    /**
 +     * ALTER TABLE <table> ALTER <column> TYPE <newtype>;
 +     *
 +     * No longer supported.
 +     */
 +    public static class AlterColumn extends AlterTableStatement
 +    {
 +        AlterColumn(String keyspaceName, String tableName)
 +        {
 +            super(keyspaceName, tableName);
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, 
TableMetadata table)
 +        {
 +            throw ire("Altering column types is no longer supported");
 +        }
 +    }
 +
 +    /**
 +     * ALTER TABLE <table> ADD <column> <newtype>
 +     * ALTER TABLE <table> ADD (<column> <newtype>, <column1> <newtype1>, ... 
<columnn> <newtypen>)
 +     */
 +    private static class AddColumns extends AlterTableStatement
 +    {
 +        private static class Column
 +        {
 +            private final ColumnIdentifier name;
 +            private final CQL3Type.Raw type;
 +            private final boolean isStatic;
 +
 +            Column(ColumnIdentifier name, CQL3Type.Raw type, boolean isStatic)
 +            {
 +                this.name = name;
 +                this.type = type;
 +                this.isStatic = isStatic;
 +            }
 +        }
 +
 +        private final Collection<Column> newColumns;
 +
 +        private AddColumns(String keyspaceName, String tableName, 
Collection<Column> newColumns)
 +        {
 +            super(keyspaceName, tableName);
 +            this.newColumns = newColumns;
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, 
TableMetadata table)
 +        {
 +            TableMetadata.Builder tableBuilder = table.unbuild();
 +            Views.Builder viewsBuilder = keyspace.views.unbuild();
 +            newColumns.forEach(c -> addColumn(keyspace, table, c, 
tableBuilder, viewsBuilder));
 +
 +            return 
keyspace.withSwapped(keyspace.tables.withSwapped(tableBuilder.build()))
 +                           .withSwapped(viewsBuilder.build());
 +        }
 +
 +        private void addColumn(KeyspaceMetadata keyspace,
 +                               TableMetadata table,
 +                               Column column,
 +                               TableMetadata.Builder tableBuilder,
 +                               Views.Builder viewsBuilder)
 +        {
 +            ColumnIdentifier name = column.name;
 +            AbstractType<?> type = column.type.prepare(keyspaceName, 
keyspace.types).getType();
 +            boolean isStatic = column.isStatic;
 +
 +            if (null != tableBuilder.getColumn(name))
 +                throw ire("Column with name '%s' already exists", name);
 +
 +            if (isStatic && table.clusteringColumns().isEmpty())
 +                throw ire("Static columns are only useful (and thus allowed) 
if the table has at least one clustering column");
 +
 +            ColumnMetadata droppedColumn = table.getDroppedColumn(name.bytes);
 +            if (null != droppedColumn)
 +            {
 +                // After #8099, not safe to re-add columns of incompatible 
types - until *maybe* deser logic with dropped
 +                // columns is pushed deeper down the line. The latter would 
still be problematic in cases of schema races.
 +                if (!type.isValueCompatibleWith(droppedColumn.type))
 +                {
 +                    throw ire("Cannot re-add previously dropped column '%s' 
of type %s, incompatible with previous type %s",
 +                              name,
 +                              type.asCQL3Type(),
 +                              droppedColumn.type.asCQL3Type());
 +                }
 +
 +                if (droppedColumn.isStatic() != isStatic)
 +                {
 +                    throw ire("Cannot re-add previously dropped column '%s' 
of kind %s, incompatible with previous kind %s",
 +                              name,
 +                              isStatic ? ColumnMetadata.Kind.STATIC : 
ColumnMetadata.Kind.REGULAR,
 +                              droppedColumn.kind);
 +                }
 +
 +                // Cannot re-add a dropped counter column. See #7831.
 +                if (table.isCounter())
 +                    throw ire("Cannot re-add previously dropped counter 
column %s", name);
 +            }
 +
 +            if (isStatic)
 +                tableBuilder.addStaticColumn(name, type);
 +            else
 +                tableBuilder.addRegularColumn(name, type);
 +
 +            if (!isStatic)
 +            {
 +                for (ViewMetadata view : keyspace.views.forTable(table.id))
 +                {
 +                    if (view.includeAllColumns)
 +                    {
 +                        ColumnMetadata viewColumn = 
ColumnMetadata.regularColumn(view.metadata, name.bytes, type);
 +                        
viewsBuilder.put(viewsBuilder.get(view.name()).withAddedRegularColumn(viewColumn));
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * ALTER TABLE <table> DROP <column>
 +     * ALTER TABLE <table> DROP ( <column>, <column1>, ... <columnn>)
 +     */
 +    // TODO: swap UDT refs with expanded tuples on drop
 +    private static class DropColumns extends AlterTableStatement
 +    {
 +        private final Set<ColumnIdentifier> removedColumns;
 +        private final Long timestamp;
 +
 +        private DropColumns(String keyspaceName, String tableName, 
Set<ColumnIdentifier> removedColumns, Long timestamp)
 +        {
 +            super(keyspaceName, tableName);
 +            this.removedColumns = removedColumns;
 +            this.timestamp = timestamp;
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, 
TableMetadata table)
 +        {
 +            TableMetadata.Builder builder = table.unbuild();
 +            removedColumns.forEach(c -> dropColumn(keyspace, table, c, 
builder));
 +            return 
keyspace.withSwapped(keyspace.tables.withSwapped(builder.build()));
 +        }
 +
 +        private void dropColumn(KeyspaceMetadata keyspace, TableMetadata 
table, ColumnIdentifier column, TableMetadata.Builder builder)
 +        {
 +            ColumnMetadata currentColumn = table.getColumn(column);
 +            if (null == currentColumn)
 +                throw ire("Column %s was not found in table '%s'", column, 
table);
 +
 +            if (currentColumn.isPrimaryKeyColumn())
 +                throw ire("Cannot drop PRIMARY KEY column %s", column);
 +
 +            /*
 +             * Cannot allow dropping top-level columns of user defined types 
that aren't frozen because we cannot convert
 +             * the type into an equivalent tuple: we only support frozen 
tuples currently. And as such we cannot persist
 +             * the correct type in system_schema.dropped_columns.
 +             */
 +            if (currentColumn.type.isUDT() && 
currentColumn.type.isMultiCell())
 +                throw ire("Cannot drop non-frozen column %s of user type %s", 
column, currentColumn.type.asCQL3Type());
 +
 +            // TODO: some day try and find a way to not rely on 
Keyspace/IndexManager/Index to find dependent indexes
 +            Set<IndexMetadata> dependentIndexes = 
Keyspace.openAndGetStore(table).indexManager.getDependentIndexes(currentColumn);
 +            if (!dependentIndexes.isEmpty())
 +            {
 +                throw ire("Cannot drop column %s because it has dependent 
secondary indexes (%s)",
 +                          currentColumn,
 +                          join(", ", transform(dependentIndexes, i -> 
i.name)));
 +            }
 +
 +            if (!isEmpty(keyspace.views.forTable(table.id)))
 +                throw ire("Cannot drop column %s on base table %s with 
materialized views", currentColumn, table.name);
 +
 +            builder.removeRegularOrStaticColumn(column);
 +            builder.recordColumnDrop(currentColumn, getTimestamp());
 +        }
 +
 +        /**
 +         * @return timestamp from query, otherwise return current time in 
micros
 +         */
 +        private long getTimestamp()
 +        {
 +            return timestamp == null ? ClientState.getTimestamp() : timestamp;
 +        }
 +    }
 +
 +    /**
 +     * ALTER TABLE <table> RENAME <column> TO <column>;
 +     */
 +    private static class RenameColumns extends AlterTableStatement
 +    {
 +        private final Map<ColumnIdentifier, ColumnIdentifier> renamedColumns;
 +
 +        private RenameColumns(String keyspaceName, String tableName, 
Map<ColumnIdentifier, ColumnIdentifier> renamedColumns)
 +        {
 +            super(keyspaceName, tableName);
 +            this.renamedColumns = renamedColumns;
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, 
TableMetadata table)
 +        {
 +            TableMetadata.Builder tableBuilder = table.unbuild();
 +            Views.Builder viewsBuilder = keyspace.views.unbuild();
 +            renamedColumns.forEach((o, n) -> renameColumn(keyspace, table, o, 
n, tableBuilder, viewsBuilder));
 +
 +            return 
keyspace.withSwapped(keyspace.tables.withSwapped(tableBuilder.build()))
 +                           .withSwapped(viewsBuilder.build());
 +        }
 +
 +        private void renameColumn(KeyspaceMetadata keyspace,
 +                                  TableMetadata table,
 +                                  ColumnIdentifier oldName,
 +                                  ColumnIdentifier newName,
 +                                  TableMetadata.Builder tableBuilder,
 +                                  Views.Builder viewsBuilder)
 +        {
 +            ColumnMetadata column = table.getExistingColumn(oldName);
 +            if (null == column)
 +                throw ire("Column %s was not found in table %s", oldName, 
table);
 +
 +            if (!column.isPrimaryKeyColumn())
 +                throw ire("Cannot rename non PRIMARY KEY column %s", oldName);
 +
 +            if (null != table.getColumn(newName))
 +            {
 +                throw ire("Cannot rename column %s to %s in table '%s'; 
another column with that name already exists",
 +                          oldName,
 +                          newName,
 +                          table);
 +            }
 +
 +            // TODO: some day try and find a way to not rely on 
Keyspace/IndexManager/Index to find dependent indexes
 +            Set<IndexMetadata> dependentIndexes = 
Keyspace.openAndGetStore(table).indexManager.getDependentIndexes(column);
 +            if (!dependentIndexes.isEmpty())
 +            {
 +                throw ire("Can't rename column %s because it has dependent 
secondary indexes (%s)",
 +                          oldName,
 +                          join(", ", transform(dependentIndexes, i -> 
i.name)));
 +            }
 +
 +            for (ViewMetadata view : keyspace.views.forTable(table.id))
 +            {
 +                if (view.includes(oldName))
 +                {
 +                    
viewsBuilder.put(viewsBuilder.get(view.name()).withRenamedPrimaryKeyColumn(oldName,
 newName));
 +                }
 +            }
 +
 +            tableBuilder.renamePrimaryKeyColumn(oldName, newName);
 +        }
 +    }
 +
 +    /**
 +     * ALTER TABLE <table> WITH <property> = <value>
 +     */
 +    private static class AlterOptions extends AlterTableStatement
 +    {
 +        private final TableAttributes attrs;
 +
 +        private AlterOptions(String keyspaceName, String tableName, 
TableAttributes attrs)
 +        {
 +            super(keyspaceName, tableName);
 +            this.attrs = attrs;
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, 
TableMetadata table)
 +        {
 +            attrs.validate();
 +
 +            TableParams params = attrs.asAlteredTableParams(table.params);
 +
 +            if (table.isCounter() && params.defaultTimeToLive > 0)
 +                throw ire("Cannot set default_time_to_live on a table with 
counters");
 +
 +            if (!isEmpty(keyspace.views.forTable(table.id)) && 
params.gcGraceSeconds == 0)
 +            {
 +                throw ire("Cannot alter gc_grace_seconds of the base table of 
a " +
 +                          "materialized view to 0, since this value is used 
to TTL " +
 +                          "undelivered updates. Setting gc_grace_seconds too 
low might " +
 +                          "cause undelivered updates to expire " +
 +                          "before being replayed.");
 +            }
 +
 +            if (keyspace.createReplicationStrategy().hasTransientReplicas()
 +                && params.readRepair != ReadRepairStrategy.NONE)
 +            {
 +                throw ire("read_repair must be set to 'NONE' for transiently 
replicated keyspaces");
 +            }
 +
 +            return 
keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params)));
 +        }
 +    }
 +
 +
 +    /**
 +     * ALTER TABLE <table> DROP COMPACT STORAGE
 +     */
 +    private static class DropCompactStorage extends AlterTableStatement
 +    {
++        private static final Logger logger = 
LoggerFactory.getLogger(AlterTableStatement.class);
++        private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES);
 +        private DropCompactStorage(String keyspaceName, String tableName)
 +        {
 +            super(keyspaceName, tableName);
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, 
TableMetadata table)
 +        {
 +            if (!table.isCompactTable())
 +                throw AlterTableStatement.ire("Cannot DROP COMPACT STORAGE on 
table without COMPACT STORAGE");
 +
++            validateCanDropCompactStorage();
++
 +            return 
keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(ImmutableSet.of(TableMetadata.Flag.COMPOUND))));
 +        }
++
++        /**
++         * Throws if DROP COMPACT STORAGE cannot be used (yet) because the 
cluster is not sufficiently upgraded. To be able
++         * to use DROP COMPACT STORAGE, we need to ensure that no pre-3.0 
sstables exists in the cluster, as we won't be
++         * able to read them anymore once COMPACT STORAGE is dropped (see 
CASSANDRA-15897). In practice, this method checks
++         * 3 things:
++         *   1) that all nodes are on 3.0+. We need this because 2.x nodes 
don't advertise their sstable versions.
++         *   2) for 3.0+, we use the new (CASSANDRA-15897) sstables versions 
set gossiped by all nodes to ensure all
++         *      sstables have been upgraded cluster-wise.
++         *   3) if the cluster still has some 3.0 nodes that predate 
CASSANDRA-15897, we will not have the sstable versions
++         *      for them. In that case, we also refuse DROP COMPACT (even 
though it may well be safe at this point) and ask
++         *      the user to upgrade all nodes.
++         */
++        private void validateCanDropCompactStorage()
++        {
++            Set<InetAddressAndPort> before4 = new HashSet<>();
++            Set<InetAddressAndPort> preC15897nodes = new HashSet<>();
++            Set<InetAddressAndPort> with2xSStables = new HashSet<>();
++            Splitter onComma = 
Splitter.on(',').omitEmptyStrings().trimResults();
++            for (InetAddressAndPort node : 
StorageService.instance.getTokenMetadata().getAllEndpoints())
++            {
++                if (MessagingService.instance().versions.knows(node) &&
++                    MessagingService.instance().versions.getRaw(node) < 
MessagingService.VERSION_40)
++                {
++                    before4.add(node);
++                    continue;
++                }
++
++                String sstableVersionsString = 
Gossiper.instance.getApplicationState(node, ApplicationState.SSTABLE_VERSIONS);
++                if (sstableVersionsString == null)
++                {
++                    preC15897nodes.add(node);
++                    continue;
++                }
++
++                try
++                {
++                    boolean has2xSStables = 
onComma.splitToList(sstableVersionsString)
++                                                   .stream()
++                                                   .anyMatch(v -> 
v.compareTo("big-ma")<=0);
++                    if (has2xSStables)
++                        with2xSStables.add(node);
++                }
++                catch (IllegalArgumentException e)
++                {
++                    // Means VersionType::fromString didn't parse a version 
correctly. Which shouldn't happen, we shouldn't
++                    // have garbage in Gossip. But crashing the request is 
not ideal, so we log the error but ignore the
++                    // node otherwise.
++                    noSpamLogger.error("Unexpected error parsing sstable 
versions from gossip for {} (gossiped value " +
++                                       "is '{}'). This is a bug and should be 
reported. Cannot ensure that {} has no " +
++                                       "non-upgraded 2.x sstables anymore. If 
after this DROP COMPACT STORAGE some old " +
++                                       "sstables cannot be read anymore, 
please use `upgradesstables` with the " +
++                                       "`--force-compact-storage-on` 
option.", node, sstableVersionsString, node);
++                }
++            }
++
++            if (!before4.isEmpty())
++                throw new InvalidRequestException(format("Cannot DROP COMPACT 
STORAGE as some nodes in the cluster (%s) " +
++                                                         "are not on 4.0+ 
yet. Please upgrade those nodes and run " +
++                                                         "`upgradesstables` 
before retrying.", before4));
++            if (!preC15897nodes.isEmpty())
++                throw new InvalidRequestException(format("Cannot guarantee 
that DROP COMPACT STORAGE is safe as some nodes " +
++                                                         "in the cluster (%s) 
do not have https://issues.apache.org/jira/browse/CASSANDRA-15897. " +
++                                                         "Please upgrade 
those nodes and retry.", preC15897nodes));
++            if (!with2xSStables.isEmpty())
++                throw new InvalidRequestException(format("Cannot DROP COMPACT 
STORAGE as some nodes in the cluster (%s) " +
++                                                         "has some 
non-upgraded 2.x sstables. Please run `upgradesstables` " +
++                                                         "on those nodes 
before retrying", with2xSStables));
++        }
 +    }
 +
 +    public static final class Raw extends CQLStatement.Raw
 +    {
 +        private enum Kind
 +        {
 +            ALTER_COLUMN, ADD_COLUMNS, DROP_COLUMNS, RENAME_COLUMNS, 
ALTER_OPTIONS, DROP_COMPACT_STORAGE
 +        }
 +
 +        private final QualifiedName name;
 +
 +        private Kind kind;
 +
 +        // ADD
 +        private final List<AddColumns.Column> addedColumns = new 
ArrayList<>();
 +
 +        // DROP
 +        private final Set<ColumnIdentifier> droppedColumns = new HashSet<>();
 +        private Long timestamp = null; // will use execution timestamp if not 
provided by query
 +
 +        // RENAME
 +        private final Map<ColumnIdentifier, ColumnIdentifier> renamedColumns 
= new HashMap<>();
 +
 +        // OPTIONS
 +        public final TableAttributes attrs = new TableAttributes();
 +
 +        public Raw(QualifiedName name)
 +        {
 +            this.name = name;
 +        }
 +
 +        public AlterTableStatement prepare(ClientState state)
 +        {
 +            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : 
state.getKeyspace();
 +            String tableName = name.getName();
 +
 +            switch (kind)
 +            {
 +                case          ALTER_COLUMN: return new 
AlterColumn(keyspaceName, tableName);
 +                case           ADD_COLUMNS: return new 
AddColumns(keyspaceName, tableName, addedColumns);
 +                case          DROP_COLUMNS: return new 
DropColumns(keyspaceName, tableName, droppedColumns, timestamp);
 +                case        RENAME_COLUMNS: return new 
RenameColumns(keyspaceName, tableName, renamedColumns);
 +                case         ALTER_OPTIONS: return new 
AlterOptions(keyspaceName, tableName, attrs);
 +                case  DROP_COMPACT_STORAGE: return new 
DropCompactStorage(keyspaceName, tableName);
 +            }
 +
 +            throw new AssertionError();
 +        }
 +
 +        public void alter(ColumnIdentifier name, CQL3Type.Raw type)
 +        {
 +            kind = Kind.ALTER_COLUMN;
 +        }
 +
 +        public void add(ColumnIdentifier name, CQL3Type.Raw type, boolean 
isStatic)
 +        {
 +            kind = Kind.ADD_COLUMNS;
 +            addedColumns.add(new AddColumns.Column(name, type, isStatic));
 +        }
 +
 +        public void drop(ColumnIdentifier name)
 +        {
 +            kind = Kind.DROP_COLUMNS;
 +            droppedColumns.add(name);
 +        }
 +
 +        public void dropCompactStorage()
 +        {
 +            kind = Kind.DROP_COMPACT_STORAGE;
 +        }
 +
 +        public void timestamp(long timestamp)
 +        {
 +            this.timestamp = timestamp;
 +        }
 +
 +        public void rename(ColumnIdentifier from, ColumnIdentifier to)
 +        {
 +            kind = Kind.RENAME_COLUMNS;
 +            renamedColumns.put(from, to);
 +        }
 +
 +        public void attrs()
 +        {
 +            this.kind = Kind.ALTER_OPTIONS;
 +        }
 +    }
 +}
diff --cc 
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index f3c9a66,7169245..deece30
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -116,21 -88,17 +116,21 @@@ public class CompactionStrategyManager 
      /**
       * Variables guarded by read and write lock above
       */
 -    //TODO check possibility of getting rid of these locks by encapsulating 
these in an immutable atomic object
 -    private final List<AbstractCompactionStrategy> repaired = new 
ArrayList<>();
 -    private final List<AbstractCompactionStrategy> unrepaired = new 
ArrayList<>();
 +    private final PendingRepairHolder transientRepairs;
 +    private final PendingRepairHolder pendingRepairs;
 +    private final CompactionStrategyHolder repaired;
 +    private final CompactionStrategyHolder unrepaired;
 +
 +    private final ImmutableList<AbstractStrategyHolder> holders;
 +
      private volatile CompactionParams params;
      private DiskBoundaries currentBoundaries;
--    private volatile boolean enabled = true;
++    private volatile boolean enabled;
      private volatile boolean isActive = true;
  
 -    /**
 +    /*
          We keep a copy of the schema compaction parameters here to be able to 
decide if we
 -        should update the compaction strategy in {@link 
this#maybeReload(CFMetaData)} due to an ALTER.
 +        should update the compaction strategy in maybeReload() due to an 
ALTER.
  
          If a user changes the local compaction strategy and then later ALTERs 
a compaction parameter,
          we will use the new compaction parameters.
@@@ -744,10 -580,10 +744,11 @@@
          readLock.lock();
          try
          {
 +
              if (notification instanceof SSTableAddedNotification)
              {
-                 handleFlushNotification(((SSTableAddedNotification) 
notification).added);
+                 SSTableAddedNotification flushedNotification = 
(SSTableAddedNotification) notification;
+                 handleFlushNotification(flushedNotification.added);
              }
              else if (notification instanceof SSTableListChangedNotification)
              {
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 9418724,5d5714a..3d72a11
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -107,7 -106,7 +107,7 @@@ public class Tracke
  
      Pair<View, View> apply(Function<View, View> function)
      {
--        return apply(Predicates.<View>alwaysTrue(), function);
++        return apply(Predicates.alwaysTrue(), function);
      }
  
      Throwable apply(Function<View, View> function, Throwable accumulate)
@@@ -236,7 -243,7 +244,7 @@@
  
      public Throwable dropSSTables(Throwable accumulate)
      {
--        return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), 
OperationType.UNKNOWN, accumulate);
++        return dropSSTables(Predicates.alwaysTrue(), OperationType.UNKNOWN, 
accumulate);
      }
  
      /**
@@@ -267,7 -274,7 +275,7 @@@
                      accumulate = updateSizeTracking(removed, emptySet(), 
accumulate);
                      accumulate = release(selfRefs(removed), accumulate);
                      // notifySSTablesChanged -> LeveledManifest.promote 
doesn't like a no-op "promotion"
--                    accumulate = notifySSTablesChanged(removed, 
Collections.<SSTableReader>emptySet(), txnLogs.type(), accumulate);
++                    accumulate = notifySSTablesChanged(removed, 
Collections.emptySet(), txnLogs.type(), accumulate);
                  }
              }
              catch (Throwable t)
@@@ -291,13 -298,13 +299,7 @@@
       */
      public void removeUnreadableSSTables(final File directory)
      {
--        maybeFail(dropSSTables(new Predicate<SSTableReader>()
--        {
--            public boolean apply(SSTableReader reader)
--            {
--                return reader.descriptor.directory.equals(directory);
--            }
--        }, OperationType.UNKNOWN, null));
++        maybeFail(dropSSTables(reader -> 
reader.descriptor.directory.equals(directory), OperationType.UNKNOWN, null));
      }
  
  
@@@ -371,7 -378,7 +373,7 @@@
          notifyDiscarded(memtable);
  
          // TODO: if we're invalidated, should we notifyadded AND removed, or 
just skip both?
-         fail = notifyAdded(sstables, memtable, fail);
 -        fail = notifyAdded(sstables, false, fail);
++        fail = notifyAdded(sstables, false, memtable, fail);
  
          if (!isDummy() && !cfstore.isValid())
              dropSSTables();
@@@ -429,9 -436,14 +431,14 @@@
          return accumulate;
      }
  
-     Throwable notifyAdded(Iterable<SSTableReader> added, Memtable memtable, 
Throwable accumulate)
 -    Throwable notifyAdded(Iterable<SSTableReader> added, boolean 
isInitialSSTables, Throwable accumulate)
++    Throwable notifyAdded(Iterable<SSTableReader> added, boolean 
isInitialSSTables, Memtable memtable, Throwable accumulate)
      {
-         INotification notification = new SSTableAddedNotification(added, 
memtable);
+         INotification notification;
+         if (!isInitialSSTables)
 -            notification = new SSTableAddedNotification(added);
++            notification = new SSTableAddedNotification(added, memtable);
+         else
+             notification = new InitialSSTableAddedNotification(added);
+ 
          for (INotificationConsumer subscriber : subscribers)
          {
              try
@@@ -446,9 -458,9 +453,9 @@@
          return accumulate;
      }
  
-     public void notifyAdded(Iterable<SSTableReader> added)
+     void notifyAdded(Iterable<SSTableReader> added, boolean isInitialSSTables)
      {
-         maybeFail(notifyAdded(added, null, null));
 -        maybeFail(notifyAdded(added, isInitialSSTables, null));
++        maybeFail(notifyAdded(added, isInitialSSTables, null, null));
      }
  
      public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> 
repairStatusesChanged)
@@@ -529,8 -533,8 +536,6 @@@
      @VisibleForTesting
      public void removeUnsafe(Set<SSTableReader> toRemove)
      {
--        Pair<View, View> result = apply(view -> {
--            return updateLiveSet(toRemove, emptySet()).apply(view);
--        });
++        Pair<View, View> result = apply(view -> updateLiveSet(toRemove, 
emptySet()).apply(view));
      }
  }
diff --cc src/java/org/apache/cassandra/gms/ApplicationState.java
index d31f50c,70e6d9c..4e20d62
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@@ -17,10 -17,17 +17,18 @@@
   */
  package org.apache.cassandra.gms;
  
+ /**
+  * The various "states" exchanged through Gossip.
+  *
+  * <p><b>Important Note:</b> Gossip uses the ordinal of this enum in the 
messages it exchanges, so values in that enum
+  * should <i>not</i> be re-ordered or removed. The end of this enum should 
also always include some "padding" so that
+  * if newer versions add new states, old nodes that don't know about those 
new states don't "break" deserializing those
+  * states.
+  */
  public enum ApplicationState
  {
 -    STATUS,
 +    // never remove a state here, ordering matters.
 +    @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 
5.0, reclaim in 6.0
      LOAD,
      SCHEMA,
      DC,
@@@ -35,15 -42,22 +43,24 @@@
      HOST_ID,
      TOKENS,
      RPC_READY,
 -    // We added SSTABLE_VERSIONS in CASSANDRA-15897 in 3.0, and at the time, 
3 more ApplicationState had been added
 -    // to newer versions, so we skipped the first 3 of our original padding 
to ensure SSTABLE_VERSIONS can preserve
 -    // its ordinal accross versions.
 -    X1,
 -    X2,
 -    X3,
 +    // pad to allow adding new states to existing cluster
 +    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two 
ports
 +    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
 +    STATUS_WITH_PORT, //Replacement for STATUS
+     /**
+      * The set of sstable versions on this node. This will usually be only 
the "current" sstable format (the one with
+      * which new sstables are written), but may contain more on newly 
upgraded nodes before `upgradesstable` has been
+      * run.
+      *
+      * <p>The value (a set of sstable {@link 
org.apache.cassandra.io.sstable.format.VersionAndType}) is serialized as
+      * a comma-separated list.
+      **/
+     SSTABLE_VERSIONS,
 -    // pad to allow adding new states to existing cluster
 +    // DO NOT EDIT OR REMOVE PADDING STATES BELOW - only add new states 
above.  See CASSANDRA-16484
 +    X1,
 +    X2,
 +    X3,
 +    X4,
      X5,
      X6,
      X7,
diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 1e8604b,d6d9dfb..0242d83
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@@ -81,10 -79,10 +81,10 @@@ public class GossipDigestAckVerbHandle
          }
  
          /* Get the state required to send to this gossipee - construct 
GossipDigestAck2Message */
-         Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new 
HashMap<InetAddressAndPort, EndpointState>();
 -        Map<InetAddress, EndpointState> deltaEpStateMap = new 
HashMap<InetAddress, EndpointState>();
++        Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new 
HashMap<>();
          for (GossipDigest gDigest : gDigestList)
          {
 -            InetAddress addr = gDigest.getEndpoint();
 +            InetAddressAndPort addr = gDigest.getEndpoint();
              EndpointState localEpStatePtr = 
Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
              if (localEpStatePtr != null)
                  deltaEpStateMap.put(addr, localEpStatePtr);
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 7720379,69f7fee..a092c77
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -105,9 -98,7 +105,9 @@@ public class Gossiper implements IFailu
          SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
          
SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
      }
-     private static List<String> ADMINISTRATIVELY_INACTIVE_STATES = 
Arrays.asList(VersionedValue.HIBERNATE,
-                                                                               
   VersionedValue.REMOVED_TOKEN,
-                                                                               
   VersionedValue.STATUS_LEFT);
 -
++    private static final List<String> ADMINISTRATIVELY_INACTIVE_STATES = 
Arrays.asList(VersionedValue.HIBERNATE,
++                                                                              
         VersionedValue.REMOVED_TOKEN,
++                                                                              
         VersionedValue.STATUS_LEFT);
      private volatile ScheduledFuture<?> scheduledGossipTask;
      private static final ReentrantLock taskLock = new ReentrantLock();
      public final static int intervalInMillis = 1000;
@@@ -124,18 -114,24 +124,18 @@@
  
      // Maximimum difference between generation value and local time we are 
willing to accept about a peer
      static final int MAX_GENERATION_DIFFERENCE = 86400 * 365;
--    private long fatClientTimeout;
++    private final long fatClientTimeout;
      private final Random random = new Random();
 -    private final Comparator<InetAddress> inetcomparator = new 
Comparator<InetAddress>()
 -    {
 -        public int compare(InetAddress addr1, InetAddress addr2)
 -        {
 -            return addr1.getHostAddress().compareTo(addr2.getHostAddress());
 -        }
 -    };
  
      /* subscribers for interest in EndpointState change */
--    private final List<IEndpointStateChangeSubscriber> subscribers = new 
CopyOnWriteArrayList<IEndpointStateChangeSubscriber>();
++    private final List<IEndpointStateChangeSubscriber> subscribers = new 
CopyOnWriteArrayList<>();
  
      /* live member set */
 -    private final Set<InetAddress> liveEndpoints = new 
ConcurrentSkipListSet<InetAddress>(inetcomparator);
 +    @VisibleForTesting
 +    final Set<InetAddressAndPort> liveEndpoints = new 
ConcurrentSkipListSet<>();
  
      /* unreachable member set */
 -    private final Map<InetAddress, Long> unreachableEndpoints = new 
ConcurrentHashMap<InetAddress, Long>();
 +    private final Map<InetAddressAndPort, Long> unreachableEndpoints = new 
ConcurrentHashMap<>();
  
      /* initial seeds for joining the cluster */
      @VisibleForTesting
@@@ -255,10 -200,10 +255,10 @@@
                  taskLock.lock();
  
                  /* Update the local heartbeat counter. */
 -                
endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
 +                
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
                  if (logger.isTraceEnabled())
 -                    logger.trace("My heartbeat is now {}", 
endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
 -                final List<GossipDigest> gDigests = new 
ArrayList<GossipDigest>();
 +                    logger.trace("My heartbeat is now {}", 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion());
-                 final List<GossipDigest> gDigests = new 
ArrayList<GossipDigest>();
++                final List<GossipDigest> gDigests = new ArrayList<>();
                  Gossiper.instance.makeRandomGossipDigest(gDigests);
  
                  if (gDigests.size() > 0)
@@@ -662,7 -578,7 +662,7 @@@
              for (GossipDigest gDigest : gDigests)
              {
                  sb.append(gDigest);
--                sb.append(" ");
++                sb.append(' ');
              }
              logger.trace("Gossip Digests are : {}", sb);
          }
@@@ -738,10 -652,10 +738,10 @@@
       */
      public void assassinateEndpoint(String address) throws 
UnknownHostException
      {
 -        InetAddress endpoint = InetAddress.getByName(address);
 +        InetAddressAndPort endpoint = InetAddressAndPort.getByName(address);
          runInGossipStageBlocking(() -> {
              EndpointState epState = endpointStateMap.get(endpoint);
--            Collection<Token> tokens = null;
++            Collection<Token> tokens;
              logger.warn("Assassinating {} via gossip", endpoint);
  
              if (epState == null)
@@@ -1076,7 -953,25 +1076,25 @@@
          return 
UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
      }
  
+     /**
+      * The value for the provided application state for the provided endpoint 
as currently known by this Gossip instance.
+      *
+      * @param endpoint the endpoint from which to get the endpoint state.
+      * @param state the endpoint state to get.
+      * @return the value of the application state {@code state} for {@code 
endpoint}, or {@code null} if either
+      * {@code endpoint} is not known by Gossip or has no value for {@code 
state}.
+      */
 -    public String getApplicationState(InetAddress endpoint, ApplicationState 
state)
++    public String getApplicationState(InetAddressAndPort endpoint, 
ApplicationState state)
+     {
+         EndpointState epState = endpointStateMap.get(endpoint);
+         if (epState == null)
+             return null;
+ 
+         VersionedValue value = epState.getApplicationState(state);
+         return value == null ? null : value.value;
+     }
+ 
 -    EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int 
version)
 +    EndpointState getStateForVersionBiggerThan(InetAddressAndPort 
forEndpoint, int version)
      {
          EndpointState epState = endpointStateMap.get(forEndpoint);
          EndpointState reqdEndpointState = null;
@@@ -1653,7 -1456,7 +1671,7 @@@
  
      public void start(int generationNumber)
      {
--        start(generationNumber, new EnumMap<ApplicationState, 
VersionedValue>(ApplicationState.class));
++        start(generationNumber, new EnumMap<>(ApplicationState.class));
      }
  
      /**
@@@ -1717,7 -1519,7 +1735,7 @@@
          seedsInShadowRound.clear();
          endpointShadowStateMap.clear();
          // send a completely empty syn
--        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
++        List<GossipDigest> gDigests = new ArrayList<>();
          GossipDigestSyn digestSynMessage = new 
GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                  DatabaseDescriptor.getPartitionerName(),
                  gDigests);
@@@ -1782,67 -1586,6 +1800,67 @@@
          }
      }
  
 +    /**
 +     * JMX interface for triggering an update of the seed node list.
 +     */
 +    public List<String> reloadSeeds()
 +    {
 +        logger.trace("Triggering reload of seed node list");
 +
 +        // Get the new set in the same that buildSeedsList does
 +        Set<InetAddressAndPort> tmp = new HashSet<>();
 +        try
 +        {
 +            for (InetAddressAndPort seed : DatabaseDescriptor.getSeeds())
 +            {
 +                if (seed.equals(FBUtilities.getBroadcastAddressAndPort()))
 +                    continue;
 +                tmp.add(seed);
 +            }
 +        }
 +        // If using the SimpleSeedProvider invalid yaml added to the config 
since startup could
 +        // cause this to throw. Additionally, third party seed providers may 
throw exceptions.
 +        // Handle the error and return a null to indicate that there was a 
problem.
 +        catch (Throwable e)
 +        {
 +            JVMStabilityInspector.inspectThrowable(e);
 +            logger.warn("Error while getting seed node list: {}", 
e.getLocalizedMessage());
 +            return null;
 +        }
 +
 +        if (tmp.size() == 0)
 +        {
 +            logger.trace("New seed node list is empty. Not updating seed 
list.");
 +            return getSeeds();
 +        }
 +
 +        if (tmp.equals(seeds))
 +        {
 +            logger.trace("New seed node list matches the existing list.");
 +            return getSeeds();
 +        }
 +
 +        // Add the new entries
 +        seeds.addAll(tmp);
 +        // Remove the old entries
 +        seeds.retainAll(tmp);
 +        logger.trace("New seed node list after reload {}", seeds);
 +        return getSeeds();
 +    }
 +
 +    /**
 +     * JMX endpoint for getting the list of seeds from the node
 +     */
 +    public List<String> getSeeds()
 +    {
-         List<String> seedList = new ArrayList<String>();
++        List<String> seedList = new ArrayList<>();
 +        for (InetAddressAndPort seed : seeds)
 +        {
 +            seedList.add(seed.toString());
 +        }
 +        return seedList;
 +    }
 +
      // initialize local HB state if needed, i.e., if gossiper has never been 
started before.
      public void maybeInitializeLocalState(int generationNbr)
      {
@@@ -2072,29 -1793,8 +2090,29 @@@
          return state != null ? state.getReleaseVersion() : null;
      }
  
 +    public Map<String, List<String>> getReleaseVersionsWithPort()
 +    {
-         Map<String, List<String>> results = new HashMap<String, 
List<String>>();
++        Map<String, List<String>> results = new HashMap<>();
 +        Iterable<InetAddressAndPort> allHosts = 
Iterables.concat(Gossiper.instance.getLiveMembers(), 
Gossiper.instance.getUnreachableMembers());
 +
 +        for (InetAddressAndPort host : allHosts)
 +        {
 +            CassandraVersion version = getReleaseVersion(host);
 +            String stringVersion = version == null ? "" : version.toString();
 +            List<String> hosts = results.get(stringVersion);
 +            if (hosts == null)
 +            {
 +                hosts = new ArrayList<>();
 +                results.put(stringVersion, hosts);
 +            }
 +            hosts.add(host.getHostAddressAndPort());
 +        }
 +
 +        return results;
 +    }
 +
      @Nullable
 -    public UUID getSchemaVersion(InetAddress ep)
 +    public UUID getSchemaVersion(InetAddressAndPort ep)
      {
          EndpointState state = getEndpointStateForEndpoint(ep);
          return state != null ? state.getSchemaVersion() : null;
diff --cc src/java/org/apache/cassandra/gms/VersionedValue.java
index 7c54559,ff3c48b..880cb98
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@@ -31,9 -33,10 +33,10 @@@ import org.apache.cassandra.db.TypeSize
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.io.IVersionedSerializer;
 -import org.apache.cassandra.io.sstable.format.Version;
+ import org.apache.cassandra.io.sstable.format.VersionAndType;
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.commons.lang3.StringUtils;
@@@ -109,7 -111,7 +112,7 @@@ public class VersionedValue implements 
      @Override
      public String toString()
      {
--        return "Value(" + value + "," + version + ")";
++        return "Value(" + value + ',' + version + ')';
      }
  
      public byte[] toBytes()
diff --cc src/java/org/apache/cassandra/io/sstable/Descriptor.java
index dc67451,7950e7b..b781ebf
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@@ -77,10 -74,16 +77,16 @@@ public class Descripto
       */
      public Descriptor(File directory, String ksname, String cfname, int 
generation, SSTableFormat.Type formatType)
      {
 -        this(formatType.info.getLatestVersion(), directory, ksname, cfname, 
generation, formatType, 
Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
 +        this(formatType.info.getLatestVersion(), directory, ksname, cfname, 
generation, formatType);
      }
  
+     @VisibleForTesting
+     public Descriptor(String version, File directory, String ksname, String 
cfname, int generation, SSTableFormat.Type formatType)
+     {
 -        this(formatType.info.getVersion(version), directory, ksname, cfname, 
generation, formatType, 
Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
++        this(formatType.info.getVersion(version), directory, ksname, cfname, 
generation, formatType);
+     }
+ 
 -    public Descriptor(Version version, File directory, String ksname, String 
cfname, int generation, SSTableFormat.Type formatType, Component 
digestComponent)
 +    public Descriptor(Version version, File directory, String ksname, String 
cfname, int generation, SSTableFormat.Type formatType)
      {
          assert version != null && directory != null && ksname != null && 
cfname != null && 
formatType.info.getLatestVersion().getClass().equals(version.getClass());
          this.version = version;
diff --cc src/java/org/apache/cassandra/io/sstable/format/Version.java
index 0e9e303,2b9dcbd..501ae85
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@@ -97,7 -105,7 +97,6 @@@ public abstract class Versio
          return version;
      }
  
--
      @Override
      public boolean equals(Object o)
      {
diff --cc src/java/org/apache/cassandra/schema/Schema.java
index c04c631,0000000..c5c1f36
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@@ -1,890 -1,0 +1,891 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
++import java.net.UnknownHostException;
 +import java.util.*;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.MapDifference;
 +import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.functions.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.UserType;
 +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.locator.LocalStrategy;
 +import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.Pair;
 +import org.cliffc.high_scale_lib.NonBlockingHashMap;
 +
 +import static java.lang.String.format;
 +
 +import static com.google.common.collect.Iterables.size;
 +
 +public final class Schema implements SchemaProvider
 +{
 +    public static final Schema instance = new Schema();
 +
 +    private volatile Keyspaces keyspaces = Keyspaces.none();
 +
 +    // UUID -> mutable metadata ref map. We have to update these in place 
every time a table changes.
 +    private final Map<TableId, TableMetadataRef> metadataRefs = new 
NonBlockingHashMap<>();
 +
 +    // (keyspace name, index name) -> mutable metadata ref map. We have to 
update these in place every time an index changes.
 +    private final Map<Pair<String, String>, TableMetadataRef> 
indexMetadataRefs = new NonBlockingHashMap<>();
 +
 +    // Keyspace objects, one per keyspace. Only one instance should ever 
exist for any given keyspace.
 +    private final Map<String, Keyspace> keyspaceInstances = new 
NonBlockingHashMap<>();
 +
 +    private volatile UUID version;
 +
 +    private final List<SchemaChangeListener> changeListeners = new 
CopyOnWriteArrayList<>();
 +
 +    /**
 +     * Initialize empty schema object and load the hardcoded system tables
 +     */
 +    private Schema()
 +    {
 +        if (DatabaseDescriptor.isDaemonInitialized() || 
DatabaseDescriptor.isToolInitialized())
 +        {
 +            load(SchemaKeyspace.metadata());
 +            load(SystemKeyspace.metadata());
 +        }
 +    }
 +
 +    /**
 +     * load keyspace (keyspace) definitions, but do not initialize the 
keyspace instances.
 +     * Schema version may be updated as the result.
 +     */
 +    public void loadFromDisk()
 +    {
 +        loadFromDisk(true);
 +    }
 +
 +    /**
 +     * Load schema definitions from disk.
 +     *
 +     * @param updateVersion true if schema version needs to be updated
 +     */
 +    public void loadFromDisk(boolean updateVersion)
 +    {
 +        SchemaDiagnostics.schemataLoading(this);
 +        SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load);
 +        if (updateVersion)
 +            updateVersion();
 +        SchemaDiagnostics.schemataLoaded(this);
 +    }
 +
 +    /**
 +     * Update (or insert) new keyspace definition
 +     *
 +     * @param ksm The metadata about keyspace
 +     */
 +    synchronized public void load(KeyspaceMetadata ksm)
 +    {
 +        KeyspaceMetadata previous = keyspaces.getNullable(ksm.name);
 +
 +        if (previous == null)
 +            loadNew(ksm);
 +        else
 +            reload(previous, ksm);
 +
 +        keyspaces = keyspaces.withAddedOrUpdated(ksm);
 +    }
 +
 +    private void loadNew(KeyspaceMetadata ksm)
 +    {
 +        ksm.tablesAndViews()
 +           .forEach(metadata -> metadataRefs.put(metadata.id, new 
TableMetadataRef(metadata)));
 +
 +        ksm.tables
 +           .indexTables()
 +           .forEach((name, metadata) -> 
indexMetadataRefs.put(Pair.create(ksm.name, name), new 
TableMetadataRef(metadata)));
 +
 +        SchemaDiagnostics.metadataInitialized(this, ksm);
 +    }
 +
 +    private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated)
 +    {
 +        Keyspace keyspace = getKeyspaceInstance(updated.name);
 +        if (null != keyspace)
 +            keyspace.setMetadata(updated);
 +
 +        Tables.TablesDiff tablesDiff = Tables.diff(previous.tables, 
updated.tables);
 +        Views.ViewsDiff viewsDiff = Views.diff(previous.views, updated.views);
 +
 +        MapDifference<String, TableMetadata> indexesDiff = 
previous.tables.indexesDiff(updated.tables);
 +
 +        // clean up after removed entries
 +        tablesDiff.dropped.forEach(table -> metadataRefs.remove(table.id));
 +        viewsDiff.dropped.forEach(view -> 
metadataRefs.remove(view.metadata.id));
 +
 +        indexesDiff.entriesOnlyOnLeft()
 +                   .values()
 +                   .forEach(indexTable -> 
indexMetadataRefs.remove(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())));
 +
 +        // load up new entries
 +        tablesDiff.created.forEach(table -> metadataRefs.put(table.id, new 
TableMetadataRef(table)));
 +        viewsDiff.created.forEach(view -> metadataRefs.put(view.metadata.id, 
new TableMetadataRef(view.metadata)));
 +
 +        indexesDiff.entriesOnlyOnRight()
 +                   .values()
 +                   .forEach(indexTable -> 
indexMetadataRefs.put(Pair.create(indexTable.keyspace, 
indexTable.indexName().get()), new TableMetadataRef(indexTable)));
 +
 +        // refresh refs to updated ones
 +        tablesDiff.altered.forEach(diff -> 
metadataRefs.get(diff.after.id).set(diff.after));
 +        viewsDiff.altered.forEach(diff -> 
metadataRefs.get(diff.after.metadata.id).set(diff.after.metadata));
 +
 +        indexesDiff.entriesDiffering()
 +                   .values()
 +                   .stream()
 +                   .map(MapDifference.ValueDifference::rightValue)
 +                   .forEach(indexTable -> 
indexMetadataRefs.get(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())).set(indexTable));
 +
 +        SchemaDiagnostics.metadataReloaded(this, previous, updated, 
tablesDiff, viewsDiff, indexesDiff);
 +    }
 +
 +    public void registerListener(SchemaChangeListener listener)
 +    {
 +        changeListeners.add(listener);
 +    }
 +
 +    @SuppressWarnings("unused")
 +    public void unregisterListener(SchemaChangeListener listener)
 +    {
 +        changeListeners.remove(listener);
 +    }
 +
 +    /**
 +     * Get keyspace instance by name
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return Keyspace object or null if keyspace was not found
 +     */
 +    @Override
 +    public Keyspace getKeyspaceInstance(String keyspaceName)
 +    {
 +        return keyspaceInstances.get(keyspaceName);
 +    }
 +
 +    public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id)
 +    {
 +        TableMetadata metadata = getTableMetadata(id);
 +        if (metadata == null)
 +            return null;
 +
 +        Keyspace instance = getKeyspaceInstance(metadata.keyspace);
 +        if (instance == null)
 +            return null;
 +
 +        return instance.hasColumnFamilyStore(metadata.id)
 +             ? instance.getColumnFamilyStore(metadata.id)
 +             : null;
 +    }
 +
 +    /**
 +     * Store given Keyspace instance to the schema
 +     *
 +     * @param keyspace The Keyspace instance to store
 +     *
 +     * @throws IllegalArgumentException if Keyspace is already stored
 +     */
 +    @Override
 +    public void storeKeyspaceInstance(Keyspace keyspace)
 +    {
 +        if (keyspaceInstances.putIfAbsent(keyspace.getName(), keyspace) != 
null)
 +            throw new IllegalArgumentException(String.format("Keyspace %s was 
already initialized.", keyspace.getName()));
 +    }
 +
 +    /**
 +     * Remove keyspace from schema
 +     *
 +     * @param keyspaceName The name of the keyspace to remove
 +     *
 +     * @return removed keyspace instance or null if it wasn't found
 +     */
 +    public Keyspace removeKeyspaceInstance(String keyspaceName)
 +    {
 +        return keyspaceInstances.remove(keyspaceName);
 +    }
 +
 +    public Keyspaces snapshot()
 +    {
 +        return keyspaces;
 +    }
 +
 +    /**
 +     * Remove keyspace definition from system
 +     *
 +     * @param ksm The keyspace definition to remove
 +     */
 +    synchronized void unload(KeyspaceMetadata ksm)
 +    {
 +        keyspaces = keyspaces.without(ksm.name);
 +
 +        ksm.tablesAndViews()
 +           .forEach(t -> metadataRefs.remove(t.id));
 +
 +        ksm.tables
 +           .indexTables()
 +           .keySet()
 +           .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, 
name)));
 +
 +        SchemaDiagnostics.metadataRemoved(this, ksm);
 +    }
 +
 +    public int getNumberOfTables()
 +    {
 +        return keyspaces.stream().mapToInt(k -> 
size(k.tablesAndViews())).sum();
 +    }
 +
 +    public ViewMetadata getView(String keyspaceName, String viewName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
 +        return (ksm == null) ? null : ksm.views.getNullable(viewName);
 +    }
 +
 +    /**
 +     * Get metadata about keyspace by its name
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return The keyspace metadata or null if it wasn't found
 +     */
 +    @Override
 +    public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
 +        return null != keyspace ? keyspace : 
VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName);
 +    }
 +
 +    private Set<String> getNonSystemKeyspacesSet()
 +    {
 +        return Sets.difference(keyspaces.names(), 
SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES);
 +    }
 +
 +    /**
 +     * @return collection of the non-system keyspaces (note that this count 
as system only the
 +     * non replicated keyspaces, so keyspace like system_traces which are 
replicated are actually
 +     * returned. See getUserKeyspace() below if you don't want those)
 +     */
 +    public ImmutableList<String> getNonSystemKeyspaces()
 +    {
 +        return ImmutableList.copyOf(getNonSystemKeyspacesSet());
 +    }
 +
 +    /**
 +     * @return a collection of keyspaces that do not use LocalStrategy for 
replication
 +     */
 +    public List<String> getNonLocalStrategyKeyspaces()
 +    {
 +        return keyspaces.stream()
 +                        .filter(keyspace -> keyspace.params.replication.klass 
!= LocalStrategy.class)
 +                        .map(keyspace -> keyspace.name)
 +                        .collect(Collectors.toList());
 +    }
 +
 +    /**
 +     * @return collection of the user defined keyspaces
 +     */
 +    public List<String> getUserKeyspaces()
 +    {
 +        return 
ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), 
SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES));
 +    }
 +
 +    /**
 +     * Get metadata about keyspace inner ColumnFamilies
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return metadata about ColumnFamilies the belong to the given keyspace
 +     */
 +    public Iterable<TableMetadata> getTablesAndViews(String keyspaceName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
 +        assert ksm != null;
 +        return ksm.tablesAndViews();
 +    }
 +
 +    /**
 +     * @return collection of the all keyspace names registered in the system 
(system and non-system)
 +     */
 +    public Set<String> getKeyspaces()
 +    {
 +        return keyspaces.names();
 +    }
 +
 +    /* TableMetadata/Ref query/control methods */
 +
 +    /**
 +     * Given a keyspace name and table/view name, get the table metadata
 +     * reference. If the keyspace name or table/view name is not present
 +     * this method returns null.
 +     *
 +     * @return TableMetadataRef object or null if it wasn't found
 +     */
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(String keyspace, String table)
 +    {
 +        TableMetadata tm = getTableMetadata(keyspace, table);
 +        return tm == null
 +             ? null
 +             : metadataRefs.get(tm.id);
 +    }
 +
 +    public TableMetadataRef getIndexTableMetadataRef(String keyspace, String 
index)
 +    {
 +        return indexMetadataRefs.get(Pair.create(keyspace, index));
 +    }
 +
 +    Map<Pair<String, String>, TableMetadataRef> getIndexTableMetadataRefs()
 +    {
 +        return indexMetadataRefs;
 +    }
 +
 +    /**
 +     * Get Table metadata by its identifier
 +     *
 +     * @param id table or view identifier
 +     *
 +     * @return metadata about Table or View
 +     */
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(TableId id)
 +    {
 +        return metadataRefs.get(id);
 +    }
 +
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(Descriptor descriptor)
 +    {
 +        return getTableMetadataRef(descriptor.ksname, descriptor.cfname);
 +    }
 +
 +    Map<TableId, TableMetadataRef> getTableMetadataRefs()
 +    {
 +        return metadataRefs;
 +    }
 +
 +    /**
 +     * Given a keyspace name and table name, get the table
 +     * meta data. If the keyspace name or table name is not valid
 +     * this function returns null.
 +     *
 +     * @param keyspace The keyspace name
 +     * @param table The table name
 +     *
 +     * @return TableMetadata object or null if it wasn't found
 +     */
 +    public TableMetadata getTableMetadata(String keyspace, String table)
 +    {
 +        assert keyspace != null;
 +        assert table != null;
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace);
 +        return ksm == null
 +             ? null
 +             : ksm.getTableOrViewNullable(table);
 +    }
 +
 +    @Override
 +    public TableMetadata getTableMetadata(TableId id)
 +    {
 +        TableMetadata table = keyspaces.getTableOrViewNullable(id);
 +        return null != table ? table : 
VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id);
 +    }
 +
 +    public TableMetadata validateTable(String keyspaceName, String tableName)
 +    {
 +        if (tableName.isEmpty())
 +            throw new InvalidRequestException("non-empty table is required");
 +
 +        KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName);
 +        if (keyspace == null)
 +            throw new KeyspaceNotDefinedException(format("keyspace %s does 
not exist", keyspaceName));
 +
 +        TableMetadata metadata = keyspace.getTableOrViewNullable(tableName);
 +        if (metadata == null)
 +            throw new InvalidRequestException(format("table %s does not 
exist", tableName));
 +
 +        return metadata;
 +    }
 +
 +    public TableMetadata getTableMetadata(Descriptor descriptor)
 +    {
 +        return getTableMetadata(descriptor.ksname, descriptor.cfname);
 +    }
 +
 +    /* Function helpers */
 +
 +    /**
 +     * Get all function overloads with the specified name
 +     *
 +     * @param name fully qualified function name
 +     * @return an empty list if the keyspace or the function name are not 
found;
 +     *         a non-empty collection of {@link Function} otherwise
 +     */
 +    public Collection<Function> getFunctions(FunctionName name)
 +    {
 +        if (!name.hasKeyspace())
 +            throw new IllegalArgumentException(String.format("Function name 
must be fully qualified: got %s", name));
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
 +        return ksm == null
 +             ? Collections.emptyList()
 +             : ksm.functions.get(name);
 +    }
 +
 +    /**
 +     * Find the function with the specified name
 +     *
 +     * @param name fully qualified function name
 +     * @param argTypes function argument types
 +     * @return an empty {@link Optional} if the keyspace or the function name 
are not found;
 +     *         a non-empty optional of {@link Function} otherwise
 +     */
 +    public Optional<Function> findFunction(FunctionName name, 
List<AbstractType<?>> argTypes)
 +    {
 +        if (!name.hasKeyspace())
 +            throw new IllegalArgumentException(String.format("Function name 
must be fully quallified: got %s", name));
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
 +        return ksm == null
 +             ? Optional.empty()
 +             : ksm.functions.find(name, argTypes);
 +    }
 +
 +    /* Version control */
 +
 +    /**
 +     * @return current schema version
 +     */
 +    public UUID getVersion()
 +    {
 +        return version;
 +    }
 +
 +    /**
 +     * Checks whether the given schema version is the same as the current 
local schema.
 +     */
 +    public boolean isSameVersion(UUID schemaVersion)
 +    {
 +        return schemaVersion != null && schemaVersion.equals(version);
 +    }
 +
 +    /**
 +     * Checks whether the current schema is empty.
 +     */
 +    public boolean isEmpty()
 +    {
 +        return SchemaConstants.emptyVersion.equals(version);
 +    }
 +
 +    /**
 +     * Read schema from system keyspace and calculate MD5 digest of every 
row, resulting digest
 +     * will be converted into UUID which would act as content-based version 
of the schema.
 +     */
 +    public void updateVersion()
 +    {
 +        version = SchemaKeyspace.calculateSchemaDigest();
 +        SystemKeyspace.updateSchemaVersion(version);
 +        SchemaDiagnostics.versionUpdated(this);
 +    }
 +
 +    /*
 +     * Like updateVersion, but also announces via gossip
 +     */
 +    public void updateVersionAndAnnounce()
 +    {
 +        updateVersion();
 +        passiveAnnounceVersion();
 +    }
 +
 +    /**
 +     * Announce my version passively over gossip.
 +     * Used to notify nodes as they arrive in the cluster.
 +     */
 +    private void passiveAnnounceVersion()
 +    {
 +        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(version));
 +        SchemaDiagnostics.versionAnnounced(this);
 +    }
 +
 +    /**
 +     * Clear all KS/CF metadata and reset version.
 +     */
 +    public synchronized void clear()
 +    {
 +        getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k)));
 +        updateVersionAndAnnounce();
 +        SchemaDiagnostics.schemataCleared(this);
 +    }
 +
 +    /*
 +     * Reload schema from local disk. Useful if a user made changes to schema 
tables by hand, or has suspicion that
 +     * in-memory representation got out of sync somehow with what's on disk.
 +     */
 +    public synchronized void reloadSchemaAndAnnounceVersion()
 +    {
 +        Keyspaces before = keyspaces.filter(k -> 
!SchemaConstants.isLocalSystemKeyspace(k.name));
 +        Keyspaces after = SchemaKeyspace.fetchNonSystemKeyspaces();
 +        merge(Keyspaces.diff(before, after));
 +        updateVersionAndAnnounce();
 +    }
 +
 +    /**
 +     * Merge remote schema in form of mutations with local and mutate ks/cf 
metadata objects
 +     * (which also involves fs operations on add/drop ks/cf)
 +     *
 +     * @param mutations the schema changes to apply
 +     *
 +     * @throws ConfigurationException If one of metadata attributes has 
invalid value
 +     */
 +    public synchronized void mergeAndAnnounceVersion(Collection<Mutation> 
mutations)
 +    {
 +        merge(mutations);
 +        updateVersionAndAnnounce();
 +    }
 +
-     public synchronized TransformationResult transform(SchemaTransformation 
transformation, boolean locally, long now)
++    public synchronized TransformationResult transform(SchemaTransformation 
transformation, boolean locally, long now) throws UnknownHostException
 +    {
 +        KeyspacesDiff diff;
 +        try
 +        {
 +            Keyspaces before = keyspaces;
 +            Keyspaces after = transformation.apply(before);
 +            diff = Keyspaces.diff(before, after);
 +        }
 +        catch (RuntimeException e)
 +        {
 +            return new TransformationResult(e);
 +        }
 +
 +        if (diff.isEmpty())
 +            return new TransformationResult(diff, Collections.emptyList());
 +
 +        Collection<Mutation> mutations = 
SchemaKeyspace.convertSchemaDiffToMutations(diff, now);
 +        SchemaKeyspace.applyChanges(mutations);
 +
 +        merge(diff);
 +        updateVersion();
 +        if (!locally)
 +            passiveAnnounceVersion();
 +
 +        return new TransformationResult(diff, mutations);
 +    }
 +
 +    public static final class TransformationResult
 +    {
 +        public final boolean success;
 +        public final RuntimeException exception;
 +        public final KeyspacesDiff diff;
 +        public final Collection<Mutation> mutations;
 +
 +        private TransformationResult(boolean success, RuntimeException 
exception, KeyspacesDiff diff, Collection<Mutation> mutations)
 +        {
 +            this.success = success;
 +            this.exception = exception;
 +            this.diff = diff;
 +            this.mutations = mutations;
 +        }
 +
 +        TransformationResult(RuntimeException exception)
 +        {
 +            this(false, exception, null, null);
 +        }
 +
 +        TransformationResult(KeyspacesDiff diff, Collection<Mutation> 
mutations)
 +        {
 +            this(true, null, diff, mutations);
 +        }
 +    }
 +
 +    synchronized void merge(Collection<Mutation> mutations)
 +    {
 +        // only compare the keyspaces affected by this set of schema mutations
 +        Set<String> affectedKeyspaces = 
SchemaKeyspace.affectedKeyspaces(mutations);
 +
 +        // fetch the current state of schema for the affected keyspaces only
 +        Keyspaces before = keyspaces.filter(k -> 
affectedKeyspaces.contains(k.name));
 +
 +        // apply the schema mutations
 +        SchemaKeyspace.applyChanges(mutations);
 +
 +        // apply the schema mutations and fetch the new versions of the 
altered keyspaces
 +        Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
 +
 +        merge(Keyspaces.diff(before, after));
 +    }
 +
 +    private void merge(KeyspacesDiff diff)
 +    {
 +        diff.dropped.forEach(this::dropKeyspace);
 +        diff.created.forEach(this::createKeyspace);
 +        diff.altered.forEach(this::alterKeyspace);
 +    }
 +
 +    private void alterKeyspace(KeyspaceDiff delta)
 +    {
 +        SchemaDiagnostics.keyspaceAltering(this, delta);
 +
 +        // drop tables and views
 +        delta.views.dropped.forEach(this::dropView);
 +        delta.tables.dropped.forEach(this::dropTable);
 +
 +        load(delta.after);
 +
 +        // add tables and views
 +        delta.tables.created.forEach(this::createTable);
 +        delta.views.created.forEach(this::createView);
 +
 +        // update tables and views
 +        delta.tables.altered.forEach(diff -> alterTable(diff.after));
 +        delta.views.altered.forEach(diff -> alterView(diff.after));
 +
 +        // deal with all added, and altered views
 +        Keyspace.open(delta.after.name).viewManager.reload(true);
 +
 +        // notify on everything dropped
 +        delta.udas.dropped.forEach(uda -> notifyDropAggregate((UDAggregate) 
uda));
 +        delta.udfs.dropped.forEach(udf -> notifyDropFunction((UDFunction) 
udf));
 +        delta.views.dropped.forEach(this::notifyDropView);
 +        delta.tables.dropped.forEach(this::notifyDropTable);
 +        delta.types.dropped.forEach(this::notifyDropType);
 +
 +        // notify on everything created
 +        delta.types.created.forEach(this::notifyCreateType);
 +        delta.tables.created.forEach(this::notifyCreateTable);
 +        delta.views.created.forEach(this::notifyCreateView);
 +        delta.udfs.created.forEach(udf -> notifyCreateFunction((UDFunction) 
udf));
 +        delta.udas.created.forEach(uda -> notifyCreateAggregate((UDAggregate) 
uda));
 +
 +        // notify on everything altered
 +        if (!delta.before.params.equals(delta.after.params))
 +            notifyAlterKeyspace(delta.before, delta.after);
 +        delta.types.altered.forEach(diff -> notifyAlterType(diff.before, 
diff.after));
 +        delta.tables.altered.forEach(diff -> notifyAlterTable(diff.before, 
diff.after));
 +        delta.views.altered.forEach(diff -> notifyAlterView(diff.before, 
diff.after));
 +        delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, 
diff.after));
 +        delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, 
diff.after));
 +        SchemaDiagnostics.keyspaceAltered(this, delta);
 +    }
 +
 +    private void createKeyspace(KeyspaceMetadata keyspace)
 +    {
 +        SchemaDiagnostics.keyspaceCreating(this, keyspace);
 +        load(keyspace);
 +        Keyspace.open(keyspace.name);
 +
 +        notifyCreateKeyspace(keyspace);
 +        keyspace.types.forEach(this::notifyCreateType);
 +        keyspace.tables.forEach(this::notifyCreateTable);
 +        keyspace.views.forEach(this::notifyCreateView);
 +        keyspace.functions.udfs().forEach(this::notifyCreateFunction);
 +        keyspace.functions.udas().forEach(this::notifyCreateAggregate);
 +        SchemaDiagnostics.keyspaceCreated(this, keyspace);
 +    }
 +
 +    private void dropKeyspace(KeyspaceMetadata keyspace)
 +    {
 +        SchemaDiagnostics.keyspaceDroping(this, keyspace);
 +        keyspace.views.forEach(this::dropView);
 +        keyspace.tables.forEach(this::dropTable);
 +
 +        // remove the keyspace from the static instances.
 +        Keyspace.clear(keyspace.name);
 +        unload(keyspace);
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        keyspace.functions.udas().forEach(this::notifyDropAggregate);
 +        keyspace.functions.udfs().forEach(this::notifyDropFunction);
 +        keyspace.views.forEach(this::notifyDropView);
 +        keyspace.tables.forEach(this::notifyDropTable);
 +        keyspace.types.forEach(this::notifyDropType);
 +        notifyDropKeyspace(keyspace);
 +        SchemaDiagnostics.keyspaceDroped(this, keyspace);
 +    }
 +
 +    private void dropView(ViewMetadata metadata)
 +    {
 +        
Keyspace.open(metadata.keyspace()).viewManager.dropView(metadata.name());
 +        dropTable(metadata.metadata);
 +    }
 +
 +    private void dropTable(TableMetadata metadata)
 +    {
 +        SchemaDiagnostics.tableDropping(this, metadata);
 +        ColumnFamilyStore cfs = 
Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
 +        assert cfs != null;
 +        // make sure all the indexes are dropped, or else.
 +        cfs.indexManager.markAllIndexesRemoved();
 +        
CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata),
 (sstable) -> true, true);
 +        if (DatabaseDescriptor.isAutoSnapshot())
 +            
cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, 
ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
 +        
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
 +        Keyspace.open(metadata.keyspace).dropCf(metadata.id);
 +        SchemaDiagnostics.tableDropped(this, metadata);
 +    }
 +
 +    private void createTable(TableMetadata table)
 +    {
 +        SchemaDiagnostics.tableCreating(this, table);
 +        Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), 
true);
 +        SchemaDiagnostics.tableCreated(this, table);
 +    }
 +
 +    private void createView(ViewMetadata view)
 +    {
 +        
Keyspace.open(view.keyspace()).initCf(metadataRefs.get(view.metadata.id), true);
 +    }
 +
 +    private void alterTable(TableMetadata updated)
 +    {
 +        SchemaDiagnostics.tableAltering(this, updated);
 +        
Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
 +        SchemaDiagnostics.tableAltered(this, updated);
 +    }
 +
 +    private void alterView(ViewMetadata updated)
 +    {
 +        
Keyspace.open(updated.keyspace()).getColumnFamilyStore(updated.name()).reload();
 +    }
 +
 +    private void notifyCreateKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onCreateKeyspace(ksm.name));
 +    }
 +
 +    private void notifyCreateTable(TableMetadata metadata)
 +    {
 +        changeListeners.forEach(l -> l.onCreateTable(metadata.keyspace, 
metadata.name));
 +    }
 +
 +    private void notifyCreateView(ViewMetadata view)
 +    {
 +        changeListeners.forEach(l -> l.onCreateView(view.keyspace(), 
view.name()));
 +    }
 +
 +    private void notifyCreateType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onCreateType(ut.keyspace, 
ut.getNameAsString()));
 +    }
 +
 +    private void notifyCreateFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onCreateFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyCreateAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyAlterKeyspace(KeyspaceMetadata before, 
KeyspaceMetadata after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterKeyspace(after.name));
 +    }
 +
 +    private void notifyAlterTable(TableMetadata before, TableMetadata after)
 +    {
 +        boolean changeAffectedPreparedStatements = 
before.changeAffectsPreparedStatements(after);
 +        changeListeners.forEach(l -> l.onAlterTable(after.keyspace, 
after.name, changeAffectedPreparedStatements));
 +    }
 +
 +    private void notifyAlterView(ViewMetadata before, ViewMetadata after)
 +    {
 +        boolean changeAffectedPreparedStatements = 
before.metadata.changeAffectsPreparedStatements(after.metadata);
 +        changeListeners.forEach(l ->l.onAlterView(after.keyspace(), 
after.name(), changeAffectedPreparedStatements));
 +    }
 +
 +    private void notifyAlterType(UserType before, UserType after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterType(after.keyspace, 
after.getNameAsString()));
 +    }
 +
 +    private void notifyAlterFunction(UDFunction before, UDFunction after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterFunction(after.name().keyspace, 
after.name().name, after.argTypes()));
 +    }
 +
 +    private void notifyAlterAggregate(UDAggregate before, UDAggregate after)
 +    {
 +        changeListeners.forEach(l -> 
l.onAlterAggregate(after.name().keyspace, after.name().name, after.argTypes()));
 +    }
 +
 +    private void notifyDropKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onDropKeyspace(ksm.name));
 +    }
 +
 +    private void notifyDropTable(TableMetadata metadata)
 +    {
 +        changeListeners.forEach(l -> l.onDropTable(metadata.keyspace, 
metadata.name));
 +    }
 +
 +    private void notifyDropView(ViewMetadata view)
 +    {
 +        changeListeners.forEach(l -> l.onDropView(view.keyspace(), 
view.name()));
 +    }
 +
 +    private void notifyDropType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onDropType(ut.keyspace, 
ut.getNameAsString()));
 +    }
 +
 +    private void notifyDropFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onDropFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyDropAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +
 +    /**
 +     * Converts the given schema version to a string. Returns {@code 
unknown}, if {@code version} is {@code null}
 +     * or {@code "(empty)"}, if {@code version} refers to an {@link 
SchemaConstants#emptyVersion empty) schema.
 +     */
 +    public static String schemaVersionToString(UUID version)
 +    {
 +        return version == null
 +               ? "unknown"
 +               : SchemaConstants.emptyVersion.equals(version)
 +                 ? "(empty)"
 +                 : version.toString();
 +    }
 +}
diff --cc src/java/org/apache/cassandra/schema/SchemaTransformation.java
index c19ac7c,0000000..e2290a3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/SchemaTransformation.java
+++ b/src/java/org/apache/cassandra/schema/SchemaTransformation.java
@@@ -1,31 -1,0 +1,33 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
++import java.net.UnknownHostException;
++
 +public interface SchemaTransformation
 +{
 +    /**
 +     * Apply a statement transformation to a schema snapshot.
 +     *
 +     * Implementing methods should be side-effect free.
 +     *
 +     * @param schema Keyspaces to base the transformation on
 +     * @return Keyspaces transformed by the statement
 +     */
-     Keyspaces apply(Keyspaces schema);
++    Keyspaces apply(Keyspaces schema) throws UnknownHostException;
 +}
diff --cc src/java/org/apache/cassandra/service/StartupChecks.java
index 85b5836,cb10ab4..dadb0c5
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@@ -35,16 -35,11 +35,14 @@@ import com.google.common.collect.Iterab
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.config.CFMetaData;
 +import net.jpountz.lz4.LZ4Factory;
- import org.apache.cassandra.config.CassandraRelevantProperties;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
- import org.apache.cassandra.schema.SchemaKeyspace;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.config.SchemaConstants;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Directories;
  import org.apache.cassandra.db.SystemKeyspace;
@@@ -55,12 -52,8 +53,14 @@@ import org.apache.cassandra.io.util.Fil
  import org.apache.cassandra.utils.NativeLibrary;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.JavaUtils;
++import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.SigarLibrary;
  
++import static java.lang.String.format;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VERSION;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NAME;
 +
  /**
   * Verifies that the system and environment is in a fit state to be started.
   * Used in CassandraDaemon#setup() to check various settings and invariants.
@@@ -83,7 -76,7 +83,6 @@@
  public class StartupChecks
  {
      private static final Logger logger = 
LoggerFactory.getLogger(StartupChecks.class);
--
      // List of checks to run before starting up. If any test reports failure, 
startup will be halted.
      private final List<StartupCheck> preFlightChecks = new ArrayList<>();
  
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 8c5437c,65596aa..e0ac1b6
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -81,8 -71,11 +81,10 @@@ import org.apache.cassandra.dht.Range
  import org.apache.cassandra.dht.Token.TokenFactory;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.gms.*;
 -import org.apache.cassandra.hints.HintVerbHandler;
  import org.apache.cassandra.hints.HintsService;
  import org.apache.cassandra.io.sstable.SSTableLoader;
+ import org.apache.cassandra.io.sstable.format.SSTableFormat;
+ import org.apache.cassandra.io.sstable.format.VersionAndType;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.locator.*;
  import org.apache.cassandra.metrics.StorageMetrics;
@@@ -323,6 -293,44 +327,8 @@@ public class StorageService extends Not
          jmxObjectName = "org.apache.cassandra.db:type=StorageService";
          MBeanWrapper.instance.registerMBean(this, jmxObjectName);
          MBeanWrapper.instance.registerMBean(StreamManager.instance, 
StreamManager.OBJECT_NAME);
+ 
 -        legacyProgressSupport = new LegacyJMXProgressSupport(this, 
jmxObjectName);
 -
 -        /* register the verb handlers */
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION,
 new MutationVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR,
 new ReadRepairVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, 
new ReadCommandVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE,
 new RangeSliceVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE,
 new RangeSliceVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION,
 new CounterMutationVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE,
 new TruncateVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE,
 new PrepareVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PROPOSE,
 new ProposeVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_COMMIT,
 new CommitVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.HINT, 
new HintVerbHandler());
 -
 -        // see BootStrapper for a summary of how the bootstrap verbs interact
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED,
 new ReplicationFinishedVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REQUEST_RESPONSE,
 new ResponseVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.INTERNAL_RESPONSE,
 new ResponseVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPAIR_MESSAGE,
 new RepairMessageVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_SHUTDOWN,
 new GossipShutdownVerbHandler());
 -
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN,
 new GossipDigestSynVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK,
 new GossipDigestAckVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
 new GossipDigestAck2VerbHandler());
 -
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.DEFINITIONS_UPDATE,
 new DefinitionsUpdateVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SCHEMA_CHECK,
 new SchemaCheckVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST,
 new MigrationRequestVerbHandler());
 -
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT,
 new SnapshotVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, 
new EchoVerbHandler());
 -
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE,
 new BatchStoreVerbHandler());
 -        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE,
 new BatchRemoveVerbHandler());
 -
+         sstablesTracker = new 
SSTablesGlobalTracker(SSTableFormat.Type.current());
      }
  
      public void registerDaemon(CassandraDaemon daemon)
@@@ -940,13 -899,18 +946,14 @@@
              // for bootstrap to get the load info it needs.
              // (we won't be part of the storage ring though until we add a 
counterId to our state, below.)
              // Seed the host ID-to-endpoint map with our own ID.
 -            getTokenMetadata().updateHostId(localHostId, 
FBUtilities.getBroadcastAddress());
 +            getTokenMetadata().updateHostId(localHostId, 
FBUtilities.getBroadcastAddressAndPort());
              appStates.put(ApplicationState.NET_VERSION, 
valueFactory.networkVersion());
              appStates.put(ApplicationState.HOST_ID, 
valueFactory.hostId(localHostId));
 -            appStates.put(ApplicationState.RPC_ADDRESS, 
valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
 +            appStates.put(ApplicationState.NATIVE_ADDRESS_AND_PORT, 
valueFactory.nativeaddressAndPort(FBUtilities.getBroadcastNativeAddressAndPort()));
 +            appStates.put(ApplicationState.RPC_ADDRESS, 
valueFactory.rpcaddress(FBUtilities.getJustBroadcastNativeAddress()));
              appStates.put(ApplicationState.RELEASE_VERSION, 
valueFactory.releaseVersion());
+             appStates.put(ApplicationState.SSTABLE_VERSIONS, 
valueFactory.sstableVersions(sstablesTracker.versionsInUse()));
  
 -            // load the persisted ring state. This used to be done earlier in 
the init process,
 -            // but now we always perform a shadow round when preparing to 
join and we have to
 -            // clear endpoint states after doing that.
 -            loadRingState();
 -
              logger.info("Starting up server gossip");
              Gossiper.instance.register(this);
              
Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); 
// needed for node-ring gathering.
diff --cc 
test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
index e94c2c4,0000000..8d051d3
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
@@@ -1,57 -1,0 +1,61 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.upgrade;
 +
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.shared.Versions;
 +
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
 +
 +public class CompactStorage3to4UpgradeTest extends UpgradeTestBase
 +{
 +    public static final String TABLE_NAME = "cs_tbl";
 +
 +    @Test
 +    public void testNullClusteringValues() throws Throwable
 +    {
 +        new TestCase().nodes(1)
 +                      .upgrade(Versions.Major.v30, Versions.Major.v4)
++                      .withConfig(config -> config.with(GOSSIP, NETWORK, 
NATIVE_PROTOCOL))
 +                      .setup(cluster -> {
 +                          String create = "CREATE TABLE %s.%s(k int, c1 int, 
c2 int, v int, PRIMARY KEY (k, c1, c2)) " +
 +                                          "WITH compaction = { 
'class':'LeveledCompactionStrategy', 'enabled':'false'} AND COMPACT STORAGE";
 +                          cluster.schemaChange(String.format(create, 
KEYSPACE, TABLE_NAME));
 +
 +                          String insert = "INSERT INTO %s.%s(k, c1, v) values 
(?, ?, ?)";
 +                          
cluster.get(1).executeInternal(String.format(insert, KEYSPACE, TABLE_NAME), 1, 
1, 1);
 +                          cluster.get(1).flush(KEYSPACE);
 +
 +                          
cluster.get(1).executeInternal(String.format(insert, KEYSPACE, TABLE_NAME), 2, 
2, 2);
 +                          cluster.get(1).flush(KEYSPACE);
 +
 +                          cluster.schemaChange(String.format("ALTER TABLE 
%s.%s DROP COMPACT STORAGE", KEYSPACE, TABLE_NAME));
 +                      })
 +                      .runAfterNodeUpgrade((cluster, node) -> {
 +                          cluster.get(1).forceCompact(KEYSPACE, TABLE_NAME);
 +                          Object[][] actual = 
cluster.get(1).executeInternal(String.format("SELECT * FROM %s.%s", KEYSPACE, 
TABLE_NAME));
 +                          assertRows(actual, new Object[] {1, 1, null, 1}, 
new Object[] {2, 2, null, 2});
 +                      })
 +                      .run();
 +    }
 +}
diff --cc 
test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java
index 46a7b1d,0000000..bac94a1
mode 100644,000000..100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java
@@@ -1,116 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3.validation.operations;
 +
 +import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.junit.Test;
 +
 +import org.apache.cassandra.cql3.CQLTester;
 +import org.apache.cassandra.cql3.QueryHandler;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaChangeListener;
 +
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +public class CompactTableTest extends CQLTester
 +{
 +    @Test
 +    public void dropCompactStorageTest() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk)) WITH 
COMPACT STORAGE;");
 +        execute("INSERT INTO %s (pk, ck) VALUES (1, 1)");
 +        alterTable("ALTER TABLE %s DROP COMPACT STORAGE");
 +        assertRows(execute( "SELECT * FROM %s"),
 +                   row(1, null, 1, null));
 +
 +        createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck)) 
WITH COMPACT STORAGE;");
 +        execute("INSERT INTO %s (pk, ck) VALUES (1, 1)");
 +        alterTable("ALTER TABLE %s DROP COMPACT STORAGE");
 +        assertRows(execute( "SELECT * FROM %s"),
 +                   row(1, 1, null));
 +
 +        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck)) WITH COMPACT STORAGE;");
 +        execute("INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
 +        alterTable("ALTER TABLE %s DROP COMPACT STORAGE");
 +        assertRows(execute( "SELECT * FROM %s"),
 +                   row(1, 1, 1));
 +    }
 +
 +    @Test
 +    public void dropCompactStorageShouldInvalidatePreparedStatements() throws 
Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck)) WITH COMPACT STORAGE;");
 +        execute("INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
 +        String templateSelect = "SELECT * FROM %s WHERE pk = 1";
 +        assertRows(execute(templateSelect), row(1, 1, 1));
 +
 +        // Verify that the prepared statement has been added to the cache 
after our first query.
 +        String formattedQuery = formatQuery(templateSelect);
 +        ConcurrentMap<String, QueryHandler.Prepared> original = 
QueryProcessor.getInternalStatements();
 +        assertTrue(original.containsKey(formattedQuery));
 +
 +        // Verify that schema change listeners are told statements are 
affected with DROP COMPACT STORAGE.
 +        SchemaChangeListener listener = new SchemaChangeListener()
 +        {
 +            public void onAlterTable(String keyspace, String table, boolean 
affectsStatements)
 +            {
 +                assertTrue(affectsStatements);
 +            }
 +        };
 +
 +        Schema.instance.registerListener(listener);
 +
 +        try
 +        {
 +            alterTable("ALTER TABLE %s DROP COMPACT STORAGE");
 +            ConcurrentMap<String, QueryHandler.Prepared> postDrop = 
QueryProcessor.getInternalStatements();
 +
 +            // Verify that the prepared statement has been removed the cache 
after DROP COMPACT STORAGE.
 +            assertFalse(postDrop.containsKey(formattedQuery));
 +
 +            // Verify that the prepared statement has been added back to the 
cache after our second query.
 +            assertRows(execute(templateSelect), row(1, 1, 1));
 +            ConcurrentMap<String, QueryHandler.Prepared> postQuery = 
QueryProcessor.getInternalStatements();
 +            assertTrue(postQuery.containsKey(formattedQuery));
 +        }
 +        finally
 +        {
 +            // Clean up the listener so this doesn't fail other tests.
 +            Schema.instance.unregisterListener(listener);
 +        }
 +    }
 +
 +    @Test
 +    public void compactStorageSemanticsTest() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck)) 
WITH COMPACT STORAGE");
 +        execute("INSERT INTO %s (pk, ck) VALUES (?, ?)", 1, 1);
 +        execute("DELETE FROM %s WHERE pk = ? AND ck = ?", 1, 1);
 +        assertEmpty(execute("SELECT * FROM %s WHERE pk = ?", 1));
 +
 +        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v int, 
PRIMARY KEY (pk, ck1, ck2)) WITH COMPACT STORAGE");
 +        execute("INSERT INTO %s (pk, ck1, v) VALUES (?, ?, ?)", 2, 2, 2);
 +        assertRows(execute("SELECT * FROM %s WHERE pk = ?",2),
 +                   row(2, 2, null, 2));
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 910445f,7705d8b..4390b20
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@@ -148,17 -145,22 +148,22 @@@ public class TrackerTes
      @Test
      public void testAddInitialSSTables()
      {
 -        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> 
metadata.caching(CachingParams.CACHE_KEYS));
          Tracker tracker = cfs.getTracker();
+         MockListener listener = new MockListener(false);
+         tracker.subscribe(listener);
          List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 
17, cfs),
                                                         MockSchema.sstable(1, 
121, cfs),
                                                         MockSchema.sstable(2, 
9, cfs));
          tracker.addInitialSSTables(copyOf(readers));
  
          Assert.assertEquals(3, tracker.view.get().sstables.size());
+         Assert.assertEquals(1, listener.senders.size());
+         Assert.assertEquals(1, listener.received.size());
+         Assert.assertTrue(listener.received.get(0) instanceof 
InitialSSTableAddedNotification);
  
          for (SSTableReader reader : readers)
 -            Assert.assertTrue(reader.isKeyCacheSetup());
 +            Assert.assertTrue(reader.isKeyCacheEnabled());
  
          Assert.assertEquals(17 + 121 + 9, 
cfs.metric.liveDiskSpaceUsed.getCount());
      }
@@@ -369,9 -367,8 +376,9 @@@
          MockListener failListener = new MockListener(true);
          tracker.subscribe(failListener);
          tracker.subscribe(listener);
-         Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null, null));
 -        Assert.assertNotNull(tracker.notifyAdded(singleton(r1), false, null));
++        Assert.assertNotNull(tracker.notifyAdded(singleton(r1), false, null, 
null));
          Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) 
listener.received.get(0)).added);
 +        Assert.assertFalse(((SSTableAddedNotification) 
listener.received.get(0)).memtable().isPresent());
          listener.received.clear();
          Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), 
singleton(r2), OperationType.COMPACTION, null));
          Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) 
listener.received.get(0)).removed);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to