This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 159e021aa36a44cdc7674dd234d4a6e189478280
Merge: 85f69cf fe70155
Author: Blake Eggleston <bdeggles...@gmail.com>
AuthorDate: Mon Nov 9 12:22:43 2020 -0800

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 .../cassandra/schema/MigrationCoordinator.java     | 516 +++++++++++++++++++++
 .../apache/cassandra/schema/MigrationManager.java  | 149 +-----
 .../org/apache/cassandra/schema/MigrationTask.java | 111 -----
 src/java/org/apache/cassandra/schema/Schema.java   |   2 +-
 .../cassandra/schema/SchemaMigrationEvent.java     |   7 +-
 .../apache/cassandra/service/StorageService.java   |  63 ++-
 .../cassandra/service/StorageServiceMBean.java     |   7 +
 .../cassandra/distributed/action/GossipHelper.java |  10 +-
 .../cassandra/schema/MigrationCoordinatorTest.java | 319 +++++++++++++
 10 files changed, 908 insertions(+), 277 deletions(-)

diff --cc CHANGES.txt
index f90ffd4,b81b0c8..1b08837
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,41 -1,10 +1,42 @@@
 -3.11.10
 +4.0-beta4
 + * Add a ratelimiter to snapshot creation and deletion (CASSANDRA-13019)
 + * Produce consistent tombstone for reads to avoid digest mistmatch 
(CASSANDRA-15369)
 + * Fix SSTableloader issue when restoring a table named backups 
(CASSANDRA-16235)
 + * Invalid serialized size for responses caused by increasing message time by 
1ms which caused extra bytes in size calculation (CASSANDRA-16103)
 + * Throw BufferOverflowException from DataOutputBuffer for better visibility 
(CASSANDRA-16214)
 + * TLS connections to the storage port on a node without server encryption 
configured causes java.io.IOException accessing missing keystore 
(CASSANDRA-16144)
 +Merged from 3.11:
  Merged from 3.0:
 - * Fix invalid cell value skipping when reading from disk (CASSANDRA-16223)
++ * Wait for schema agreement when bootstrapping (CASSANDRA-15158)
   * Prevent invoking enable/disable gossip when not in NORMAL (CASSANDRA-16146)
  
 -3.11.9
 - * Synchronize Keyspace instance store/clear (CASSANDRA-16210)
 +4.0-beta3
 + * Segregate Network and Chunk Cache BufferPools and Recirculate Partially 
Freed Chunks (CASSANDRA-15229)
 + * Fail truncation requests when they fail on a replica (CASSANDRA-16208)
 + * Move compact storage validation earlier in startup process 
(CASSANDRA-16063)
 + * Fix ByteBufferAccessor cast exceptions are thrown when trying to query a 
virtual table (CASSANDRA-16155)
 + * Consolidate node liveness check for forced repair (CASSANDRA-16113)
 + * Use unsigned short in ValueAccessor.sliceWithShortLength (CASSANDRA-16147)
 + * Abort repairs when getting a truncation request (CASSANDRA-15854)
 + * Remove bad assert when getting active compactions for an sstable 
(CASSANDRA-15457)
 + * Avoid failing compactions with very large partitions (CASSANDRA-15164)
 + * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
 + * Avoid invalid state transition exception during incremental repair 
(CASSANDRA-16067)
 + * Allow zero padding in timestamp serialization (CASSANDRA-16105)
 + * Add byte array backed cells (CASSANDRA-15393)
 + * Correctly handle pending ranges with adjacent range movements 
(CASSANDRA-14801)
 + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
 + * Add nodetool getfullquerylog (CASSANDRA-15988)
 + * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
 + * Avoid trying to keep track of RTs for endpoints we won't write to during 
read repair (CASSANDRA-16084)
 + * When compaction gets interrupted, the exception should include the 
compactionId (CASSANDRA-15954)
 + * Make Table/Keyspace Metric Names Consistent With Each Other 
(CASSANDRA-15909)
 + * Mutating sstable component may race with entire-sstable-streaming(ZCS) 
causing checksum validation failure (CASSANDRA-15861)
 + * NPE thrown while updating speculative execution time if keyspace is 
removed during task execution (CASSANDRA-15949)
 + * Show the progress of data streaming and index build (CASSANDRA-15406)
 + * Add flag to disable chunk cache and disable by default (CASSANDRA-16036)
 + * Upgrade to snakeyaml >= 1.26 version for CVE-2017-18640 fix 
(CASSANDRA-16150)
 +Merged from 3.11:
   * Fix ColumnFilter to avoid querying cells of unselected complex columns 
(CASSANDRA-15977)
   * Fix memory leak in CompressedChunkReader (CASSANDRA-15880)
   * Don't attempt value skipping with mixed version cluster (CASSANDRA-15833)
diff --cc src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index 0000000,0000000..9fb348e
new file mode 100644
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@@ -1,0 -1,0 +1,516 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.schema;
++
++import java.lang.management.ManagementFactory;
++import java.lang.management.RuntimeMXBean;
++import java.util.ArrayDeque;
++import java.util.ArrayList;
++import java.util.Collection;
++import java.util.Deque;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.UUID;
++import java.util.concurrent.Future;
++import java.util.concurrent.FutureTask;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.function.LongSupplier;
++
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.collect.ImmutableSet;
++import com.google.common.collect.Sets;
++import com.google.common.util.concurrent.Futures;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.concurrent.ScheduledExecutors;
++import org.apache.cassandra.concurrent.Stage;
++import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.exceptions.RequestFailureReason;
++import org.apache.cassandra.gms.ApplicationState;
++import org.apache.cassandra.gms.EndpointState;
++import org.apache.cassandra.gms.FailureDetector;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.net.Message;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.NoPayload;
++import org.apache.cassandra.net.RequestCallback;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.utils.FBUtilities;
++import org.apache.cassandra.utils.concurrent.WaitQueue;
++
++public class MigrationCoordinator
++{
++    private static final Logger logger = 
LoggerFactory.getLogger(MigrationCoordinator.class);
++    private static final Future<Void> FINISHED_FUTURE = 
Futures.immediateFuture(null);
++
++    private static LongSupplier getUptimeFn = () -> 
ManagementFactory.getRuntimeMXBean().getUptime();
++
++    @VisibleForTesting
++    public static void setUptimeFn(LongSupplier supplier)
++    {
++        getUptimeFn = supplier;
++    }
++
++
++    private static final int MIGRATION_DELAY_IN_MS = 60000;
++    private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
++
++    public static final MigrationCoordinator instance = new 
MigrationCoordinator();
++
++    static class VersionInfo
++    {
++        final UUID version;
++
++        final Set<InetAddressAndPort> endpoints           = 
Sets.newConcurrentHashSet();
++        final Set<InetAddressAndPort> outstandingRequests = 
Sets.newConcurrentHashSet();
++        final Deque<InetAddressAndPort> requestQueue      = new 
ArrayDeque<>();
++
++        private final WaitQueue waitQueue = new WaitQueue();
++
++        volatile boolean receivedSchema;
++
++        VersionInfo(UUID version)
++        {
++            this.version = version;
++        }
++
++        WaitQueue.Signal register()
++        {
++            return waitQueue.register();
++        }
++
++        void markReceived()
++        {
++            if (receivedSchema)
++                return;
++
++            receivedSchema = true;
++            waitQueue.signalAll();
++        }
++
++        boolean wasReceived()
++        {
++            return receivedSchema;
++        }
++    }
++
++    private final Map<UUID, VersionInfo> versionInfo = new HashMap<>();
++    private final Map<InetAddressAndPort, UUID> endpointVersions = new 
HashMap<>();
++    private final AtomicInteger inflightTasks = new AtomicInteger();
++
++    public void start()
++    {
++        
ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions,
 1, 1, TimeUnit.MINUTES);
++    }
++
++    public synchronized void reset()
++    {
++        versionInfo.clear();
++    }
++
++    synchronized List<Future<Void>> pullUnreceivedSchemaVersions()
++    {
++        List<Future<Void>> futures = new ArrayList<>();
++        for (VersionInfo info : versionInfo.values())
++        {
++            if (info.wasReceived() || info.outstandingRequests.size() > 0)
++                continue;
++
++            Future<Void> future = maybePullSchema(info);
++            if (future != null && future != FINISHED_FUTURE)
++                futures.add(future);
++        }
++
++        return futures;
++    }
++
++    synchronized Future<Void> maybePullSchema(VersionInfo info)
++    {
++        if (info.endpoints.isEmpty() || info.wasReceived() || 
!shouldPullSchema(info.version))
++            return FINISHED_FUTURE;
++
++        if (info.outstandingRequests.size() >= 
getMaxOutstandingVersionRequests())
++            return FINISHED_FUTURE;
++
++        for (int i=0, isize=info.requestQueue.size(); i<isize; i++)
++        {
++            InetAddressAndPort endpoint = info.requestQueue.remove();
++            if (!info.endpoints.contains(endpoint))
++                continue;
++
++            if (shouldPullFromEndpoint(endpoint) && 
info.outstandingRequests.add(endpoint))
++            {
++                return scheduleSchemaPull(endpoint, info);
++            }
++            else
++            {
++                // return to queue
++                info.requestQueue.offer(endpoint);
++            }
++        }
++
++        // no suitable endpoints were found, check again in a minute, the 
periodic task will pick it up
++        return null;
++    }
++
++    public synchronized Map<UUID, Set<InetAddressAndPort>> 
outstandingVersions()
++    {
++        HashMap<UUID, Set<InetAddressAndPort>> map = new HashMap<>();
++        for (VersionInfo info : versionInfo.values())
++            if (!info.wasReceived())
++                map.put(info.version, ImmutableSet.copyOf(info.endpoints));
++        return map;
++    }
++
++    @VisibleForTesting
++    protected VersionInfo getVersionInfoUnsafe(UUID version)
++    {
++        return versionInfo.get(version);
++    }
++
++    @VisibleForTesting
++    protected int getMaxOutstandingVersionRequests()
++    {
++        return MAX_OUTSTANDING_VERSION_REQUESTS;
++    }
++
++    @VisibleForTesting
++    protected boolean isAlive(InetAddressAndPort endpoint)
++    {
++        return FailureDetector.instance.isAlive(endpoint);
++    }
++
++    @VisibleForTesting
++    protected boolean shouldPullSchema(UUID version)
++    {
++        if (Schema.instance.getVersion() == null)
++        {
++            logger.debug("Not pulling schema for version {}, because local 
schama version is not known yet", version);
++            return false;
++        }
++
++        if (Schema.instance.isSameVersion(version))
++        {
++            logger.debug("Not pulling schema for version {}, because schema 
versions match: " +
++                         "local={}, remote={}",
++                         version,
++                         
Schema.schemaVersionToString(Schema.instance.getVersion()),
++                         Schema.schemaVersionToString(version));
++            return false;
++        }
++        return true;
++    }
++
++    // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is 
safe to accept schema changes
++    // from both 3.0 and 3.0.14.
++    private static boolean is30Compatible(int version)
++    {
++        return version == MessagingService.current_version || version == 
MessagingService.VERSION_3014;
++    }
++
++    @VisibleForTesting
++    protected boolean shouldPullFromEndpoint(InetAddressAndPort endpoint)
++    {
++        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
++            return false;
++
++        EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
++        if (state == null)
++            return false;
++
++        final String releaseVersion = 
state.getApplicationState(ApplicationState.RELEASE_VERSION).value;
++        final String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
++
++        if (!releaseVersion.startsWith(ourMajorVersion))
++        {
++            logger.debug("Not pulling schema from {} because release version 
in Gossip is not major version {}, it is {}",
++                         endpoint, ourMajorVersion, releaseVersion);
++            return false;
++        }
++
++        if (!MessagingService.instance().versions.knows(endpoint))
++        {
++            logger.debug("Not pulling schema from {} because their messaging 
version is unknown", endpoint);
++            return false;
++        }
++
++        if (MessagingService.instance().versions.getRaw(endpoint) != 
MessagingService.current_version)
++        {
++            logger.debug("Not pulling schema from {} because their schema 
format is incompatible", endpoint);
++            return false;
++        }
++
++        if (Gossiper.instance.isGossipOnlyMember(endpoint))
++        {
++            logger.debug("Not pulling schema from {} because it's a gossip 
only member", endpoint);
++            return false;
++        }
++        return true;
++    }
++
++    @VisibleForTesting
++    protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID 
version)
++    {
++        if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < 
MIGRATION_DELAY_IN_MS)
++        {
++            // If we think we may be bootstrapping or have recently started, 
submit MigrationTask immediately
++            logger.debug("Immediately submitting migration task for {}, " +
++                         "schema versions: local={}, remote={}",
++                         endpoint,
++                         
Schema.schemaVersionToString(Schema.instance.getVersion()),
++                         Schema.schemaVersionToString(version));
++            return true;
++        }
++        return false;
++    }
++
++    @VisibleForTesting
++    protected boolean isLocalVersion(UUID version)
++    {
++        return Schema.instance.isSameVersion(version);
++    }
++
++    /**
++     * If a previous schema update brought our version the same as the 
incoming schema, don't apply it
++     */
++    synchronized boolean shouldApplySchemaFor(VersionInfo info)
++    {
++        if (info.wasReceived())
++            return false;
++        return !isLocalVersion(info.version);
++    }
++
++    public synchronized Future<Void> reportEndpointVersion(InetAddressAndPort 
endpoint, UUID version)
++    {
++        UUID current = endpointVersions.put(endpoint, version);
++        if (current != null && current.equals(version))
++            return FINISHED_FUTURE;
++
++        VersionInfo info = versionInfo.computeIfAbsent(version, 
VersionInfo::new);
++        if (isLocalVersion(version))
++            info.markReceived();
++        info.endpoints.add(endpoint);
++        info.requestQueue.addFirst(endpoint);
++
++        // disassociate this endpoint from its (now) previous schema version
++        removeEndpointFromVersion(endpoint, current);
++
++        return maybePullSchema(info);
++    }
++
++    public Future<Void> reportEndpointVersion(InetAddressAndPort endpoint, 
EndpointState state)
++    {
++        if (state == null)
++            return FINISHED_FUTURE;
++
++        UUID version = state.getSchemaVersion();
++
++        if (version == null)
++            return FINISHED_FUTURE;
++
++        return reportEndpointVersion(endpoint, version);
++    }
++
++    private synchronized void removeEndpointFromVersion(InetAddressAndPort 
endpoint, UUID version)
++    {
++        if (version == null)
++            return;
++
++        VersionInfo info = versionInfo.get(version);
++
++        if (info == null)
++            return;
++
++        info.endpoints.remove(endpoint);
++        if (info.endpoints.isEmpty())
++        {
++            info.waitQueue.signalAll();
++            versionInfo.remove(version);
++        }
++    }
++
++    Future<Void> scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo 
info)
++    {
++        FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new 
Callback(endpoint, info)), null);
++        if (shouldPullImmediately(endpoint, info.version))
++        {
++            Stage.MIGRATION.submit(task);
++        }
++        else
++        {
++            ScheduledExecutors.nonPeriodicTasks.schedule(() -> 
Stage.MIGRATION.submit(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
++        }
++        return task;
++    }
++
++    @VisibleForTesting
++    protected void mergeSchemaFrom(InetAddressAndPort endpoint, 
Collection<Mutation> mutations)
++    {
++        Schema.instance.mergeAndAnnounceVersion(mutations);
++    }
++
++    class Callback implements RequestCallback<Collection<Mutation>>
++    {
++        final InetAddressAndPort endpoint;
++        final VersionInfo info;
++
++        public Callback(InetAddressAndPort endpoint, VersionInfo info)
++        {
++            this.endpoint = endpoint;
++            this.info = info;
++        }
++
++        public void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason)
++        {
++            fail();
++        }
++
++        Future<Void> fail()
++        {
++            return pullComplete(endpoint, info, false);
++        }
++
++        public void onResponse(Message<Collection<Mutation>> message)
++        {
++            response(message.payload);
++        }
++
++        Future<Void> response(Collection<Mutation> mutations)
++        {
++            synchronized (info)
++            {
++                if (shouldApplySchemaFor(info))
++                {
++                    try
++                    {
++                        mergeSchemaFrom(endpoint, mutations);
++                    }
++                    catch (Exception e)
++                    {
++                        logger.error(String.format("Unable to merge schema 
from %s", endpoint), e);
++                        return fail();
++                    }
++                }
++                return pullComplete(endpoint, info, true);
++            }
++        }
++
++        public boolean isLatencyForSnitch()
++        {
++            return false;
++        }
++    }
++
++    private void pullSchema(Callback callback)
++    {
++        if (!isAlive(callback.endpoint))
++        {
++            logger.warn("Can't send schema pull request: node {} is down.", 
callback.endpoint);
++            callback.fail();
++            return;
++        }
++
++        // There is a chance that quite some time could have passed between 
now and the MM#maybeScheduleSchemaPull(),
++        // potentially enough for the endpoint node to restart - which is an 
issue if it does restart upgraded, with
++        // a higher major.
++        if (!shouldPullFromEndpoint(callback.endpoint))
++        {
++            logger.info("Skipped sending a migration request: node {} has a 
higher major version now.", callback.endpoint);
++            callback.fail();
++            return;
++        }
++
++        logger.debug("Requesting schema from {}", callback.endpoint);
++        sendMigrationMessage(callback);
++    }
++
++    protected void sendMigrationMessage(Callback callback)
++    {
++        inflightTasks.getAndIncrement();
++        Message message = Message.out(Verb.SCHEMA_PULL_REQ, 
NoPayload.noPayload);
++        logger.info("Sending schema pull request to {}", callback.endpoint);
++        MessagingService.instance().sendWithCallback(message, 
callback.endpoint, callback);
++    }
++
++    private synchronized Future<Void> pullComplete(InetAddressAndPort 
endpoint, VersionInfo info, boolean wasSuccessful)
++    {
++        inflightTasks.decrementAndGet();
++        if (wasSuccessful)
++            info.markReceived();
++
++        info.outstandingRequests.remove(endpoint);
++        info.requestQueue.add(endpoint);
++        return maybePullSchema(info);
++    }
++
++    public int getInflightTasks()
++    {
++        return inflightTasks.get();
++    }
++
++    /**
++     * Wait until we've received schema responses for all versions we're 
aware of
++     * @param waitMillis
++     * @return true if response for all schemas were received, false if we 
timed out waiting
++     */
++    public boolean awaitSchemaRequests(long waitMillis)
++    {
++        if 
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
++            Gossiper.waitToSettle();
++
++        WaitQueue.Signal signal = null;
++        try
++        {
++            synchronized (this)
++            {
++                List<WaitQueue.Signal> signalList = new 
ArrayList<>(versionInfo.size());
++                for (VersionInfo version : versionInfo.values())
++                {
++                    if (version.wasReceived())
++                        continue;
++
++                    signalList.add(version.register());
++                }
++
++                if (signalList.isEmpty())
++                    return true;
++
++                WaitQueue.Signal[] signals = new 
WaitQueue.Signal[signalList.size()];
++                signalList.toArray(signals);
++                signal = WaitQueue.all(signals);
++            }
++
++            return signal.awaitUntil(System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(waitMillis));
++        }
++        catch (InterruptedException e)
++        {
++            throw new RuntimeException(e);
++        }
++        finally
++        {
++            if (signal != null)
++                signal.cancel();
++        }
++    }
++}
diff --cc src/java/org/apache/cassandra/schema/MigrationManager.java
index 8f627cd,0000000..c2ec511
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@@ -1,501 -1,0 +1,360 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.io.IOException;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.lang.management.ManagementFactory;
 +import java.util.function.LongSupplier;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.util.concurrent.Futures;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.exceptions.AlreadyExistsException;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.gms.*;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.concurrent.Stage.MIGRATION;
 +import static org.apache.cassandra.net.Verb.SCHEMA_PUSH_REQ;
 +
 +public class MigrationManager
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(MigrationManager.class);
 +
 +    public static final MigrationManager instance = new MigrationManager();
 +
 +    private static LongSupplier getUptimeFn = () -> 
ManagementFactory.getRuntimeMXBean().getUptime();
 +
 +    @VisibleForTesting
 +    public static void setUptimeFn(LongSupplier supplier)
 +    {
 +        getUptimeFn = supplier;
 +    }
 +
 +    private static final int MIGRATION_DELAY_IN_MS = 60000;
 +
 +    private static final int MIGRATION_TASK_WAIT_IN_SECONDS = 
Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", 
"1"));
 +
 +    private MigrationManager() {}
 +
-     public static void scheduleSchemaPull(InetAddressAndPort endpoint, 
EndpointState state)
-     {
-         UUID schemaVersion = state.getSchemaVersion();
-         if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && 
schemaVersion != null)
-             maybeScheduleSchemaPull(schemaVersion, endpoint, 
state.getApplicationState(ApplicationState.RELEASE_VERSION).value);
-     }
- 
-     /**
-      * If versions differ this node sends request with local migration list 
to the endpoint
-      * and expecting to receive a list of migrations to apply locally.
-      */
-     private static void maybeScheduleSchemaPull(final UUID theirVersion, 
final InetAddressAndPort endpoint, String releaseVersion)
-     {
-         String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
-         if (!releaseVersion.startsWith(ourMajorVersion))
-         {
-             logger.debug("Not pulling schema because release version in 
Gossip is not major version {}, it is {}", ourMajorVersion, releaseVersion);
-             return;
-         }
-         if (Schema.instance.getVersion() == null)
-         {
-             logger.debug("Not pulling schema from {}, because local schema 
version is not known yet",
-                          endpoint);
-             SchemaMigrationDiagnostics.unknownLocalSchemaVersion(endpoint, 
theirVersion);
-             return;
-         }
-         if (Schema.instance.isSameVersion(theirVersion))
-         {
-             logger.debug("Not pulling schema from {}, because schema versions 
match ({})",
-                          endpoint,
-                          Schema.schemaVersionToString(theirVersion));
-             SchemaMigrationDiagnostics.versionMatch(endpoint, theirVersion);
-             return;
-         }
-         if (!shouldPullSchemaFrom(endpoint))
-         {
-             logger.debug("Not pulling schema from {}, because versions match 
({}/{}), or shouldPullSchemaFrom returned false",
-                          endpoint, Schema.instance.getVersion(), 
theirVersion);
-             SchemaMigrationDiagnostics.skipPull(endpoint, theirVersion);
-             return;
-         }
- 
-         if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < 
MIGRATION_DELAY_IN_MS)
-         {
-             // If we think we may be bootstrapping or have recently started, 
submit MigrationTask immediately
-             logger.debug("Immediately submitting migration task for {}, " +
-                          "schema versions: local={}, remote={}",
-                          endpoint,
-                          
Schema.schemaVersionToString(Schema.instance.getVersion()),
-                          Schema.schemaVersionToString(theirVersion));
-             submitMigrationTask(endpoint);
-         }
-         else
-         {
-             // Include a delay to make sure we have a chance to apply any 
changes being
-             // pushed out simultaneously. See CASSANDRA-5025
-             Runnable runnable = () ->
-             {
-                 // grab the latest version of the schema since it may have 
changed again since the initial scheduling
-                 UUID epSchemaVersion = 
Gossiper.instance.getSchemaVersion(endpoint);
-                 if (epSchemaVersion == null)
-                 {
-                     logger.debug("epState vanished for {}, not submitting 
migration task", endpoint);
-                     return;
-                 }
-                 if (Schema.instance.isSameVersion(epSchemaVersion))
-                 {
-                     logger.debug("Not submitting migration task for {} 
because our versions match ({})", endpoint, epSchemaVersion);
-                     return;
-                 }
-                 logger.debug("Submitting migration task for {}, schema 
version mismatch: local={}, remote={}",
-                              endpoint,
-                              
Schema.schemaVersionToString(Schema.instance.getVersion()),
-                              Schema.schemaVersionToString(epSchemaVersion));
-                 submitMigrationTask(endpoint);
-             };
-             ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 
MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
-         }
-     }
- 
-     private static Future<?> submitMigrationTask(InetAddressAndPort endpoint)
-     {
-         /*
-          * Do not de-ref the future because that causes distributed deadlock 
(CASSANDRA-3832) because we are
-          * running in the gossip stage.
-          */
-         return MIGRATION.submit(new MigrationTask(endpoint));
-     }
- 
-     static boolean shouldPullSchemaFrom(InetAddressAndPort endpoint)
-     {
-         /*
-          * Don't request schema from nodes with a differnt or unknonw major 
version (may have incompatible schema)
-          * Don't request schema from fat clients
-          */
-         return MessagingService.instance().versions.knows(endpoint)
-                 && MessagingService.instance().versions.getRaw(endpoint) == 
MessagingService.current_version
-                 && !Gossiper.instance.isGossipOnlyMember(endpoint);
-     }
- 
 +    private static boolean shouldPushSchemaTo(InetAddressAndPort endpoint)
 +    {
 +        // only push schema to nodes with known and equal versions
 +        return !endpoint.equals(FBUtilities.getBroadcastAddressAndPort())
 +               && MessagingService.instance().versions.knows(endpoint)
 +               && MessagingService.instance().versions.getRaw(endpoint) == 
MessagingService.current_version;
 +    }
 +
-     public static boolean isReadyForBootstrap()
-     {
-         return MigrationTask.getInflightTasks().isEmpty();
-     }
- 
-     public static void waitUntilReadyForBootstrap()
-     {
-         CountDownLatch completionLatch;
-         while ((completionLatch = MigrationTask.getInflightTasks().poll()) != 
null)
-         {
-             try
-             {
-                 if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, 
TimeUnit.SECONDS))
-                     logger.error("Migration task failed to complete");
-             }
-             catch (InterruptedException e)
-             {
-                 Thread.currentThread().interrupt();
-                 logger.error("Migration task was interrupted");
-             }
-         }
-     }
- 
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm) throws 
ConfigurationException
 +    {
 +        announceNewKeyspace(ksm, false);
 +    }
 +
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
 +    {
 +        announceNewKeyspace(ksm, FBUtilities.timestampMicros(), 
announceLocally);
 +    }
 +
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm, long 
timestamp, boolean announceLocally) throws ConfigurationException
 +    {
 +        ksm.validate();
 +
 +        if (Schema.instance.getKeyspaceMetadata(ksm.name) != null)
 +            throw new AlreadyExistsException(ksm.name);
 +
 +        logger.info("Create new Keyspace: {}", ksm);
 +        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), 
announceLocally);
 +    }
 +
 +    public static void announceNewTable(TableMetadata cfm)
 +    {
 +        announceNewTable(cfm, true, FBUtilities.timestampMicros());
 +    }
 +
-     /**
-      * Announces the table even if the definition is already know locally.
-      * This should generally be avoided but is used internally when we want 
to force the most up to date version of
-      * a system table schema (Note that we don't know if the schema we force 
_is_ the most recent version or not, we
-      * just rely on idempotency to basically ignore that announce if it's 
not. That's why we can't use announceTableUpdate
-      * it would for instance delete new columns if this is not called with 
the most up-to-date version)
-      *
-      * Note that this is only safe for system tables where we know the id is 
fixed and will be the same whatever version
-      * of the definition is used.
-      */
-     public static void forceAnnounceNewTable(TableMetadata cfm)
-     {
-         announceNewTable(cfm, false, 0);
-     }
- 
 +    private static void announceNewTable(TableMetadata cfm, boolean 
throwOnDuplicate, long timestamp)
 +    {
 +        cfm.validate();
 +
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(cfm.keyspace);
 +        if (ksm == null)
 +            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", cfm.name, cfm.keyspace));
 +        // If we have a table or a view which has the same name, we can't add 
a new one
 +        else if (throwOnDuplicate && ksm.getTableOrViewNullable(cfm.name) != 
null)
 +            throw new AlreadyExistsException(cfm.keyspace, cfm.name);
 +
 +        logger.info("Create new table: {}", cfm);
 +        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), 
false);
 +    }
 +
 +    static void announceKeyspaceUpdate(KeyspaceMetadata ksm)
 +    {
 +        ksm.validate();
 +
 +        KeyspaceMetadata oldKsm = 
Schema.instance.getKeyspaceMetadata(ksm.name);
 +        if (oldKsm == null)
 +            throw new ConfigurationException(String.format("Cannot update non 
existing keyspace '%s'.", ksm.name));
 +
 +        logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, 
ksm);
 +        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, 
ksm.params, FBUtilities.timestampMicros()), false);
 +    }
 +
 +    public static void announceTableUpdate(TableMetadata tm)
 +    {
 +        announceTableUpdate(tm, false);
 +    }
 +
 +    public static void announceTableUpdate(TableMetadata updated, boolean 
announceLocally)
 +    {
 +        updated.validate();
 +
 +        TableMetadata current = 
Schema.instance.getTableMetadata(updated.keyspace, updated.name);
 +        if (current == null)
 +            throw new ConfigurationException(String.format("Cannot update non 
existing table '%s' in keyspace '%s'.", updated.name, updated.keyspace));
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(current.keyspace);
 +
 +        updated.validateCompatibility(current);
 +
 +        long timestamp = FBUtilities.timestampMicros();
 +
 +        logger.info("Update table '{}/{}' From {} To {}", current.keyspace, 
current.name, current, updated);
 +        Mutation.SimpleBuilder builder = 
SchemaKeyspace.makeUpdateTableMutation(ksm, current, updated, timestamp);
 +
 +        announce(builder, announceLocally);
 +    }
 +
 +    static void announceKeyspaceDrop(String ksName)
 +    {
 +        KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName);
 +        if (oldKsm == null)
 +            throw new ConfigurationException(String.format("Cannot drop non 
existing keyspace '%s'.", ksName));
 +
 +        logger.info("Drop Keyspace '{}'", oldKsm.name);
 +        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, 
FBUtilities.timestampMicros()), false);
 +    }
 +
 +    public static void announceTableDrop(String ksName, String cfName, 
boolean announceLocally)
 +    {
 +        TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName);
 +        if (tm == null)
 +            throw new ConfigurationException(String.format("Cannot drop non 
existing table '%s' in keyspace '%s'.", cfName, ksName));
 +        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
 +
 +        logger.info("Drop table '{}/{}'", tm.keyspace, tm.name);
 +        announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    /**
 +     * actively announce a new version to active hosts via rpc
 +     * @param schema The schema mutation to be applied
 +     */
 +    private static void announce(Mutation.SimpleBuilder schema, boolean 
announceLocally)
 +    {
 +        List<Mutation> mutations = Collections.singletonList(schema.build());
 +
 +        if (announceLocally)
 +            Schema.instance.merge(mutations);
 +        else
 +            announce(mutations);
 +    }
 +
 +    public static void announce(Mutation change)
 +    {
 +        announce(Collections.singleton(change));
 +    }
 +
 +    public static void announce(Collection<Mutation> schema)
 +    {
 +        Future<?> f = MIGRATION.submit(() -> 
Schema.instance.mergeAndAnnounceVersion(schema));
 +
 +        Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>();
 +        Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>();
 +        Message<Collection<Mutation>> message = Message.out(SCHEMA_PUSH_REQ, 
schema);
 +        for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
 +        {
 +            if (shouldPushSchemaTo(endpoint))
 +            {
 +                MessagingService.instance().send(message, endpoint);
 +                schemaDestinationEndpoints.add(endpoint);
 +            }
 +            else
 +            {
 +                schemaEndpointsIgnored.add(endpoint);
 +            }
 +        }
 +
 +        
SchemaAnnouncementDiagnostics.schemaMutationsAnnounced(schemaDestinationEndpoints,
 schemaEndpointsIgnored);
 +        FBUtilities.waitOnFuture(f);
 +    }
 +
 +    public static KeyspacesDiff announce(SchemaTransformation transformation, 
boolean locally)
 +    {
 +        long now = FBUtilities.timestampMicros();
 +
 +        Future<Schema.TransformationResult> future =
 +            MIGRATION.submit(() -> Schema.instance.transform(transformation, 
locally, now));
 +
 +        Schema.TransformationResult result = Futures.getUnchecked(future);
 +        if (!result.success)
 +            throw result.exception;
 +
 +        if (locally || result.diff.isEmpty())
 +            return result.diff;
 +
 +        Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>();
 +        Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>();
 +        Message<Collection<Mutation>> message = Message.out(SCHEMA_PUSH_REQ, 
result.mutations);
 +        for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
 +        {
 +            if (shouldPushSchemaTo(endpoint))
 +            {
 +                MessagingService.instance().send(message, endpoint);
 +                schemaDestinationEndpoints.add(endpoint);
 +            }
 +            else
 +            {
 +                schemaEndpointsIgnored.add(endpoint);
 +            }
 +        }
 +
 +        
SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(schemaDestinationEndpoints,
 schemaEndpointsIgnored,
 +                                                                    
transformation);
 +
 +        return result.diff;
 +    }
 +
 +    /**
 +     * Clear all locally stored schema information and reset schema to 
initial state.
 +     * Called by user (via JMX) who wants to get rid of schema disagreement.
 +     */
 +    public static void resetLocalSchema()
 +    {
 +        logger.info("Starting local schema reset...");
 +
 +        logger.debug("Truncating schema tables...");
 +
 +        SchemaMigrationDiagnostics.resetLocalSchema();
 +
 +        SchemaKeyspace.truncate();
 +
 +        logger.debug("Clearing local schema keyspace definitions...");
 +
 +        Schema.instance.clear();
 +
 +        Set<InetAddressAndPort> liveEndpoints = 
Gossiper.instance.getLiveMembers();
 +        liveEndpoints.remove(FBUtilities.getBroadcastAddressAndPort());
 +
 +        // force migration if there are nodes around
 +        for (InetAddressAndPort node : liveEndpoints)
 +        {
-             if (shouldPullSchemaFrom(node))
-             {
-                 logger.debug("Requesting schema from {}", node);
-                 FBUtilities.waitOnFuture(submitMigrationTask(node));
-                 break;
-             }
++            EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(node);
++            Future<Void> pull = 
MigrationCoordinator.instance.reportEndpointVersion(node, state);
++            if (pull != null)
++                FBUtilities.waitOnFuture(pull);
 +        }
 +
 +        logger.info("Local schema reset is complete.");
 +    }
 +
 +    /**
 +     * We have a set of non-local, distributed system keyspaces, e.g. 
system_traces, system_auth, etc.
 +     * (see {@link SchemaConstants#REPLICATED_SYSTEM_KEYSPACE_NAMES}), that 
need to be created on cluster initialisation,
 +     * and later evolved on major upgrades (sometimes minor too). This method 
compares the current known definitions
 +     * of the tables (if the keyspace exists) to the expected, most modern 
ones expected by the running version of C*;
 +     * if any changes have been detected, a schema Mutation will be created 
which, when applied, should make
 +     * cluster's view of that keyspace aligned with the expected modern 
definition.
 +     *
 +     * @param keyspace   the expected modern definition of the keyspace
 +     * @param generation timestamp to use for the table changes in the schema 
mutation
 +     *
 +     * @return empty Optional if the current definition is up to date, or an 
Optional with the Mutation that would
 +     *         bring the schema in line with the expected definition.
 +     */
 +    public static Optional<Mutation> evolveSystemKeyspace(KeyspaceMetadata 
keyspace, long generation)
 +    {
 +        Mutation.SimpleBuilder builder = null;
 +
 +        KeyspaceMetadata definedKeyspace = 
Schema.instance.getKeyspaceMetadata(keyspace.name);
 +        Tables definedTables = null == definedKeyspace ? Tables.none() : 
definedKeyspace.tables;
 +
 +        for (TableMetadata table : keyspace.tables)
 +        {
 +            if (table.equals(definedTables.getNullable(table.name)))
 +                continue;
 +
 +            if (null == builder)
 +            {
 +                // for the keyspace definition itself (name, replication, 
durability) always use generation 0;
 +                // this ensures that any changes made to replication by the 
user will never be overwritten.
 +                builder = 
SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, 0);
 +
 +                // now set the timestamp to generation, so the tables have 
the expected timestamp
 +                builder.timestamp(generation);
 +            }
 +
 +            // for table definitions always use the provided generation; 
these tables, unlike their containing
 +            // keyspaces, are *NOT* meant to be altered by the user; if their 
definitions need to change,
 +            // the schema must be updated in code, and the appropriate 
generation must be bumped.
 +            SchemaKeyspace.addTableToSchemaMutation(table, true, builder);
 +        }
 +
 +        return builder == null ? Optional.empty() : 
Optional.of(builder.build());
 +    }
 +
 +    public static class MigrationsSerializer implements 
IVersionedSerializer<Collection<Mutation>>
 +    {
 +        public static MigrationsSerializer instance = new 
MigrationsSerializer();
 +
 +        public void serialize(Collection<Mutation> schema, DataOutputPlus 
out, int version) throws IOException
 +        {
 +            out.writeInt(schema.size());
 +            for (Mutation mutation : schema)
 +                Mutation.serializer.serialize(mutation, out, version);
 +        }
 +
 +        public Collection<Mutation> deserialize(DataInputPlus in, int 
version) throws IOException
 +        {
 +            int count = in.readInt();
 +            Collection<Mutation> schema = new ArrayList<>(count);
 +
 +            for (int i = 0; i < count; i++)
 +                schema.add(Mutation.serializer.deserialize(in, version));
 +
 +            return schema;
 +        }
 +
 +        public long serializedSize(Collection<Mutation> schema, int version)
 +        {
 +            int size = TypeSizes.sizeof(schema.size());
 +            for (Mutation mutation : schema)
 +                size += mutation.serializedSize(version);
 +            return size;
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/schema/Schema.java
index 9c0c590,0000000..c04c631
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@@ -1,890 -1,0 +1,890 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.util.*;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.MapDifference;
 +import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.functions.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.UserType;
 +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.locator.LocalStrategy;
 +import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.Pair;
 +import org.cliffc.high_scale_lib.NonBlockingHashMap;
 +
 +import static java.lang.String.format;
 +
 +import static com.google.common.collect.Iterables.size;
 +
 +public final class Schema implements SchemaProvider
 +{
 +    public static final Schema instance = new Schema();
 +
 +    private volatile Keyspaces keyspaces = Keyspaces.none();
 +
 +    // UUID -> mutable metadata ref map. We have to update these in place 
every time a table changes.
 +    private final Map<TableId, TableMetadataRef> metadataRefs = new 
NonBlockingHashMap<>();
 +
 +    // (keyspace name, index name) -> mutable metadata ref map. We have to 
update these in place every time an index changes.
 +    private final Map<Pair<String, String>, TableMetadataRef> 
indexMetadataRefs = new NonBlockingHashMap<>();
 +
 +    // Keyspace objects, one per keyspace. Only one instance should ever 
exist for any given keyspace.
 +    private final Map<String, Keyspace> keyspaceInstances = new 
NonBlockingHashMap<>();
 +
 +    private volatile UUID version;
 +
 +    private final List<SchemaChangeListener> changeListeners = new 
CopyOnWriteArrayList<>();
 +
 +    /**
 +     * Initialize empty schema object and load the hardcoded system tables
 +     */
 +    private Schema()
 +    {
 +        if (DatabaseDescriptor.isDaemonInitialized() || 
DatabaseDescriptor.isToolInitialized())
 +        {
 +            load(SchemaKeyspace.metadata());
 +            load(SystemKeyspace.metadata());
 +        }
 +    }
 +
 +    /**
 +     * load keyspace (keyspace) definitions, but do not initialize the 
keyspace instances.
 +     * Schema version may be updated as the result.
 +     */
 +    public void loadFromDisk()
 +    {
 +        loadFromDisk(true);
 +    }
 +
 +    /**
 +     * Load schema definitions from disk.
 +     *
 +     * @param updateVersion true if schema version needs to be updated
 +     */
 +    public void loadFromDisk(boolean updateVersion)
 +    {
 +        SchemaDiagnostics.schemataLoading(this);
 +        SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load);
 +        if (updateVersion)
 +            updateVersion();
 +        SchemaDiagnostics.schemataLoaded(this);
 +    }
 +
 +    /**
 +     * Update (or insert) new keyspace definition
 +     *
 +     * @param ksm The metadata about keyspace
 +     */
 +    synchronized public void load(KeyspaceMetadata ksm)
 +    {
 +        KeyspaceMetadata previous = keyspaces.getNullable(ksm.name);
 +
 +        if (previous == null)
 +            loadNew(ksm);
 +        else
 +            reload(previous, ksm);
 +
 +        keyspaces = keyspaces.withAddedOrUpdated(ksm);
 +    }
 +
 +    private void loadNew(KeyspaceMetadata ksm)
 +    {
 +        ksm.tablesAndViews()
 +           .forEach(metadata -> metadataRefs.put(metadata.id, new 
TableMetadataRef(metadata)));
 +
 +        ksm.tables
 +           .indexTables()
 +           .forEach((name, metadata) -> 
indexMetadataRefs.put(Pair.create(ksm.name, name), new 
TableMetadataRef(metadata)));
 +
 +        SchemaDiagnostics.metadataInitialized(this, ksm);
 +    }
 +
 +    private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated)
 +    {
 +        Keyspace keyspace = getKeyspaceInstance(updated.name);
 +        if (null != keyspace)
 +            keyspace.setMetadata(updated);
 +
 +        Tables.TablesDiff tablesDiff = Tables.diff(previous.tables, 
updated.tables);
 +        Views.ViewsDiff viewsDiff = Views.diff(previous.views, updated.views);
 +
 +        MapDifference<String, TableMetadata> indexesDiff = 
previous.tables.indexesDiff(updated.tables);
 +
 +        // clean up after removed entries
 +        tablesDiff.dropped.forEach(table -> metadataRefs.remove(table.id));
 +        viewsDiff.dropped.forEach(view -> 
metadataRefs.remove(view.metadata.id));
 +
 +        indexesDiff.entriesOnlyOnLeft()
 +                   .values()
 +                   .forEach(indexTable -> 
indexMetadataRefs.remove(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())));
 +
 +        // load up new entries
 +        tablesDiff.created.forEach(table -> metadataRefs.put(table.id, new 
TableMetadataRef(table)));
 +        viewsDiff.created.forEach(view -> metadataRefs.put(view.metadata.id, 
new TableMetadataRef(view.metadata)));
 +
 +        indexesDiff.entriesOnlyOnRight()
 +                   .values()
 +                   .forEach(indexTable -> 
indexMetadataRefs.put(Pair.create(indexTable.keyspace, 
indexTable.indexName().get()), new TableMetadataRef(indexTable)));
 +
 +        // refresh refs to updated ones
 +        tablesDiff.altered.forEach(diff -> 
metadataRefs.get(diff.after.id).set(diff.after));
 +        viewsDiff.altered.forEach(diff -> 
metadataRefs.get(diff.after.metadata.id).set(diff.after.metadata));
 +
 +        indexesDiff.entriesDiffering()
 +                   .values()
 +                   .stream()
 +                   .map(MapDifference.ValueDifference::rightValue)
 +                   .forEach(indexTable -> 
indexMetadataRefs.get(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())).set(indexTable));
 +
 +        SchemaDiagnostics.metadataReloaded(this, previous, updated, 
tablesDiff, viewsDiff, indexesDiff);
 +    }
 +
 +    public void registerListener(SchemaChangeListener listener)
 +    {
 +        changeListeners.add(listener);
 +    }
 +
 +    @SuppressWarnings("unused")
 +    public void unregisterListener(SchemaChangeListener listener)
 +    {
 +        changeListeners.remove(listener);
 +    }
 +
 +    /**
 +     * Get keyspace instance by name
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return Keyspace object or null if keyspace was not found
 +     */
 +    @Override
 +    public Keyspace getKeyspaceInstance(String keyspaceName)
 +    {
 +        return keyspaceInstances.get(keyspaceName);
 +    }
 +
 +    public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id)
 +    {
 +        TableMetadata metadata = getTableMetadata(id);
 +        if (metadata == null)
 +            return null;
 +
 +        Keyspace instance = getKeyspaceInstance(metadata.keyspace);
 +        if (instance == null)
 +            return null;
 +
 +        return instance.hasColumnFamilyStore(metadata.id)
 +             ? instance.getColumnFamilyStore(metadata.id)
 +             : null;
 +    }
 +
 +    /**
 +     * Store given Keyspace instance to the schema
 +     *
 +     * @param keyspace The Keyspace instance to store
 +     *
 +     * @throws IllegalArgumentException if Keyspace is already stored
 +     */
 +    @Override
 +    public void storeKeyspaceInstance(Keyspace keyspace)
 +    {
 +        if (keyspaceInstances.putIfAbsent(keyspace.getName(), keyspace) != 
null)
 +            throw new IllegalArgumentException(String.format("Keyspace %s was 
already initialized.", keyspace.getName()));
 +    }
 +
 +    /**
 +     * Remove keyspace from schema
 +     *
 +     * @param keyspaceName The name of the keyspace to remove
 +     *
 +     * @return removed keyspace instance or null if it wasn't found
 +     */
 +    public Keyspace removeKeyspaceInstance(String keyspaceName)
 +    {
 +        return keyspaceInstances.remove(keyspaceName);
 +    }
 +
 +    public Keyspaces snapshot()
 +    {
 +        return keyspaces;
 +    }
 +
 +    /**
 +     * Remove keyspace definition from system
 +     *
 +     * @param ksm The keyspace definition to remove
 +     */
 +    synchronized void unload(KeyspaceMetadata ksm)
 +    {
 +        keyspaces = keyspaces.without(ksm.name);
 +
 +        ksm.tablesAndViews()
 +           .forEach(t -> metadataRefs.remove(t.id));
 +
 +        ksm.tables
 +           .indexTables()
 +           .keySet()
 +           .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, 
name)));
 +
 +        SchemaDiagnostics.metadataRemoved(this, ksm);
 +    }
 +
 +    public int getNumberOfTables()
 +    {
 +        return keyspaces.stream().mapToInt(k -> 
size(k.tablesAndViews())).sum();
 +    }
 +
 +    public ViewMetadata getView(String keyspaceName, String viewName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
 +        return (ksm == null) ? null : ksm.views.getNullable(viewName);
 +    }
 +
 +    /**
 +     * Get metadata about keyspace by its name
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return The keyspace metadata or null if it wasn't found
 +     */
 +    @Override
 +    public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
 +        return null != keyspace ? keyspace : 
VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName);
 +    }
 +
 +    private Set<String> getNonSystemKeyspacesSet()
 +    {
 +        return Sets.difference(keyspaces.names(), 
SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES);
 +    }
 +
 +    /**
 +     * @return collection of the non-system keyspaces (note that this count 
as system only the
 +     * non replicated keyspaces, so keyspace like system_traces which are 
replicated are actually
 +     * returned. See getUserKeyspace() below if you don't want those)
 +     */
 +    public ImmutableList<String> getNonSystemKeyspaces()
 +    {
 +        return ImmutableList.copyOf(getNonSystemKeyspacesSet());
 +    }
 +
 +    /**
 +     * @return a collection of keyspaces that do not use LocalStrategy for 
replication
 +     */
 +    public List<String> getNonLocalStrategyKeyspaces()
 +    {
 +        return keyspaces.stream()
 +                        .filter(keyspace -> keyspace.params.replication.klass 
!= LocalStrategy.class)
 +                        .map(keyspace -> keyspace.name)
 +                        .collect(Collectors.toList());
 +    }
 +
 +    /**
 +     * @return collection of the user defined keyspaces
 +     */
 +    public List<String> getUserKeyspaces()
 +    {
 +        return 
ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), 
SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES));
 +    }
 +
 +    /**
 +     * Get metadata about keyspace inner ColumnFamilies
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return metadata about ColumnFamilies the belong to the given keyspace
 +     */
 +    public Iterable<TableMetadata> getTablesAndViews(String keyspaceName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
 +        assert ksm != null;
 +        return ksm.tablesAndViews();
 +    }
 +
 +    /**
 +     * @return collection of the all keyspace names registered in the system 
(system and non-system)
 +     */
 +    public Set<String> getKeyspaces()
 +    {
 +        return keyspaces.names();
 +    }
 +
 +    /* TableMetadata/Ref query/control methods */
 +
 +    /**
 +     * Given a keyspace name and table/view name, get the table metadata
 +     * reference. If the keyspace name or table/view name is not present
 +     * this method returns null.
 +     *
 +     * @return TableMetadataRef object or null if it wasn't found
 +     */
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(String keyspace, String table)
 +    {
 +        TableMetadata tm = getTableMetadata(keyspace, table);
 +        return tm == null
 +             ? null
 +             : metadataRefs.get(tm.id);
 +    }
 +
 +    public TableMetadataRef getIndexTableMetadataRef(String keyspace, String 
index)
 +    {
 +        return indexMetadataRefs.get(Pair.create(keyspace, index));
 +    }
 +
 +    Map<Pair<String, String>, TableMetadataRef> getIndexTableMetadataRefs()
 +    {
 +        return indexMetadataRefs;
 +    }
 +
 +    /**
 +     * Get Table metadata by its identifier
 +     *
 +     * @param id table or view identifier
 +     *
 +     * @return metadata about Table or View
 +     */
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(TableId id)
 +    {
 +        return metadataRefs.get(id);
 +    }
 +
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(Descriptor descriptor)
 +    {
 +        return getTableMetadataRef(descriptor.ksname, descriptor.cfname);
 +    }
 +
 +    Map<TableId, TableMetadataRef> getTableMetadataRefs()
 +    {
 +        return metadataRefs;
 +    }
 +
 +    /**
 +     * Given a keyspace name and table name, get the table
 +     * meta data. If the keyspace name or table name is not valid
 +     * this function returns null.
 +     *
 +     * @param keyspace The keyspace name
 +     * @param table The table name
 +     *
 +     * @return TableMetadata object or null if it wasn't found
 +     */
 +    public TableMetadata getTableMetadata(String keyspace, String table)
 +    {
 +        assert keyspace != null;
 +        assert table != null;
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace);
 +        return ksm == null
 +             ? null
 +             : ksm.getTableOrViewNullable(table);
 +    }
 +
 +    @Override
 +    public TableMetadata getTableMetadata(TableId id)
 +    {
 +        TableMetadata table = keyspaces.getTableOrViewNullable(id);
 +        return null != table ? table : 
VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id);
 +    }
 +
 +    public TableMetadata validateTable(String keyspaceName, String tableName)
 +    {
 +        if (tableName.isEmpty())
 +            throw new InvalidRequestException("non-empty table is required");
 +
 +        KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName);
 +        if (keyspace == null)
 +            throw new KeyspaceNotDefinedException(format("keyspace %s does 
not exist", keyspaceName));
 +
 +        TableMetadata metadata = keyspace.getTableOrViewNullable(tableName);
 +        if (metadata == null)
 +            throw new InvalidRequestException(format("table %s does not 
exist", tableName));
 +
 +        return metadata;
 +    }
 +
 +    public TableMetadata getTableMetadata(Descriptor descriptor)
 +    {
 +        return getTableMetadata(descriptor.ksname, descriptor.cfname);
 +    }
 +
 +    /* Function helpers */
 +
 +    /**
 +     * Get all function overloads with the specified name
 +     *
 +     * @param name fully qualified function name
 +     * @return an empty list if the keyspace or the function name are not 
found;
 +     *         a non-empty collection of {@link Function} otherwise
 +     */
 +    public Collection<Function> getFunctions(FunctionName name)
 +    {
 +        if (!name.hasKeyspace())
 +            throw new IllegalArgumentException(String.format("Function name 
must be fully qualified: got %s", name));
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
 +        return ksm == null
 +             ? Collections.emptyList()
 +             : ksm.functions.get(name);
 +    }
 +
 +    /**
 +     * Find the function with the specified name
 +     *
 +     * @param name fully qualified function name
 +     * @param argTypes function argument types
 +     * @return an empty {@link Optional} if the keyspace or the function name 
are not found;
 +     *         a non-empty optional of {@link Function} otherwise
 +     */
 +    public Optional<Function> findFunction(FunctionName name, 
List<AbstractType<?>> argTypes)
 +    {
 +        if (!name.hasKeyspace())
 +            throw new IllegalArgumentException(String.format("Function name 
must be fully quallified: got %s", name));
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
 +        return ksm == null
 +             ? Optional.empty()
 +             : ksm.functions.find(name, argTypes);
 +    }
 +
 +    /* Version control */
 +
 +    /**
 +     * @return current schema version
 +     */
 +    public UUID getVersion()
 +    {
 +        return version;
 +    }
 +
 +    /**
 +     * Checks whether the given schema version is the same as the current 
local schema.
 +     */
 +    public boolean isSameVersion(UUID schemaVersion)
 +    {
 +        return schemaVersion != null && schemaVersion.equals(version);
 +    }
 +
 +    /**
 +     * Checks whether the current schema is empty.
 +     */
 +    public boolean isEmpty()
 +    {
 +        return SchemaConstants.emptyVersion.equals(version);
 +    }
 +
 +    /**
 +     * Read schema from system keyspace and calculate MD5 digest of every 
row, resulting digest
 +     * will be converted into UUID which would act as content-based version 
of the schema.
 +     */
 +    public void updateVersion()
 +    {
 +        version = SchemaKeyspace.calculateSchemaDigest();
 +        SystemKeyspace.updateSchemaVersion(version);
 +        SchemaDiagnostics.versionUpdated(this);
 +    }
 +
 +    /*
 +     * Like updateVersion, but also announces via gossip
 +     */
 +    public void updateVersionAndAnnounce()
 +    {
 +        updateVersion();
 +        passiveAnnounceVersion();
 +    }
 +
 +    /**
 +     * Announce my version passively over gossip.
 +     * Used to notify nodes as they arrive in the cluster.
 +     */
 +    private void passiveAnnounceVersion()
 +    {
 +        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(version));
 +        SchemaDiagnostics.versionAnnounced(this);
 +    }
 +
 +    /**
 +     * Clear all KS/CF metadata and reset version.
 +     */
 +    public synchronized void clear()
 +    {
 +        getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k)));
 +        updateVersionAndAnnounce();
 +        SchemaDiagnostics.schemataCleared(this);
 +    }
 +
 +    /*
 +     * Reload schema from local disk. Useful if a user made changes to schema 
tables by hand, or has suspicion that
 +     * in-memory representation got out of sync somehow with what's on disk.
 +     */
 +    public synchronized void reloadSchemaAndAnnounceVersion()
 +    {
 +        Keyspaces before = keyspaces.filter(k -> 
!SchemaConstants.isLocalSystemKeyspace(k.name));
 +        Keyspaces after = SchemaKeyspace.fetchNonSystemKeyspaces();
 +        merge(Keyspaces.diff(before, after));
 +        updateVersionAndAnnounce();
 +    }
 +
 +    /**
 +     * Merge remote schema in form of mutations with local and mutate ks/cf 
metadata objects
 +     * (which also involves fs operations on add/drop ks/cf)
 +     *
 +     * @param mutations the schema changes to apply
 +     *
 +     * @throws ConfigurationException If one of metadata attributes has 
invalid value
 +     */
-     synchronized void mergeAndAnnounceVersion(Collection<Mutation> mutations)
++    public synchronized void mergeAndAnnounceVersion(Collection<Mutation> 
mutations)
 +    {
 +        merge(mutations);
 +        updateVersionAndAnnounce();
 +    }
 +
 +    public synchronized TransformationResult transform(SchemaTransformation 
transformation, boolean locally, long now)
 +    {
 +        KeyspacesDiff diff;
 +        try
 +        {
 +            Keyspaces before = keyspaces;
 +            Keyspaces after = transformation.apply(before);
 +            diff = Keyspaces.diff(before, after);
 +        }
 +        catch (RuntimeException e)
 +        {
 +            return new TransformationResult(e);
 +        }
 +
 +        if (diff.isEmpty())
 +            return new TransformationResult(diff, Collections.emptyList());
 +
 +        Collection<Mutation> mutations = 
SchemaKeyspace.convertSchemaDiffToMutations(diff, now);
 +        SchemaKeyspace.applyChanges(mutations);
 +
 +        merge(diff);
 +        updateVersion();
 +        if (!locally)
 +            passiveAnnounceVersion();
 +
 +        return new TransformationResult(diff, mutations);
 +    }
 +
 +    public static final class TransformationResult
 +    {
 +        public final boolean success;
 +        public final RuntimeException exception;
 +        public final KeyspacesDiff diff;
 +        public final Collection<Mutation> mutations;
 +
 +        private TransformationResult(boolean success, RuntimeException 
exception, KeyspacesDiff diff, Collection<Mutation> mutations)
 +        {
 +            this.success = success;
 +            this.exception = exception;
 +            this.diff = diff;
 +            this.mutations = mutations;
 +        }
 +
 +        TransformationResult(RuntimeException exception)
 +        {
 +            this(false, exception, null, null);
 +        }
 +
 +        TransformationResult(KeyspacesDiff diff, Collection<Mutation> 
mutations)
 +        {
 +            this(true, null, diff, mutations);
 +        }
 +    }
 +
 +    synchronized void merge(Collection<Mutation> mutations)
 +    {
 +        // only compare the keyspaces affected by this set of schema mutations
 +        Set<String> affectedKeyspaces = 
SchemaKeyspace.affectedKeyspaces(mutations);
 +
 +        // fetch the current state of schema for the affected keyspaces only
 +        Keyspaces before = keyspaces.filter(k -> 
affectedKeyspaces.contains(k.name));
 +
 +        // apply the schema mutations
 +        SchemaKeyspace.applyChanges(mutations);
 +
 +        // apply the schema mutations and fetch the new versions of the 
altered keyspaces
 +        Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
 +
 +        merge(Keyspaces.diff(before, after));
 +    }
 +
 +    private void merge(KeyspacesDiff diff)
 +    {
 +        diff.dropped.forEach(this::dropKeyspace);
 +        diff.created.forEach(this::createKeyspace);
 +        diff.altered.forEach(this::alterKeyspace);
 +    }
 +
 +    private void alterKeyspace(KeyspaceDiff delta)
 +    {
 +        SchemaDiagnostics.keyspaceAltering(this, delta);
 +
 +        // drop tables and views
 +        delta.views.dropped.forEach(this::dropView);
 +        delta.tables.dropped.forEach(this::dropTable);
 +
 +        load(delta.after);
 +
 +        // add tables and views
 +        delta.tables.created.forEach(this::createTable);
 +        delta.views.created.forEach(this::createView);
 +
 +        // update tables and views
 +        delta.tables.altered.forEach(diff -> alterTable(diff.after));
 +        delta.views.altered.forEach(diff -> alterView(diff.after));
 +
 +        // deal with all added, and altered views
 +        Keyspace.open(delta.after.name).viewManager.reload(true);
 +
 +        // notify on everything dropped
 +        delta.udas.dropped.forEach(uda -> notifyDropAggregate((UDAggregate) 
uda));
 +        delta.udfs.dropped.forEach(udf -> notifyDropFunction((UDFunction) 
udf));
 +        delta.views.dropped.forEach(this::notifyDropView);
 +        delta.tables.dropped.forEach(this::notifyDropTable);
 +        delta.types.dropped.forEach(this::notifyDropType);
 +
 +        // notify on everything created
 +        delta.types.created.forEach(this::notifyCreateType);
 +        delta.tables.created.forEach(this::notifyCreateTable);
 +        delta.views.created.forEach(this::notifyCreateView);
 +        delta.udfs.created.forEach(udf -> notifyCreateFunction((UDFunction) 
udf));
 +        delta.udas.created.forEach(uda -> notifyCreateAggregate((UDAggregate) 
uda));
 +
 +        // notify on everything altered
 +        if (!delta.before.params.equals(delta.after.params))
 +            notifyAlterKeyspace(delta.before, delta.after);
 +        delta.types.altered.forEach(diff -> notifyAlterType(diff.before, 
diff.after));
 +        delta.tables.altered.forEach(diff -> notifyAlterTable(diff.before, 
diff.after));
 +        delta.views.altered.forEach(diff -> notifyAlterView(diff.before, 
diff.after));
 +        delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, 
diff.after));
 +        delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, 
diff.after));
 +        SchemaDiagnostics.keyspaceAltered(this, delta);
 +    }
 +
 +    private void createKeyspace(KeyspaceMetadata keyspace)
 +    {
 +        SchemaDiagnostics.keyspaceCreating(this, keyspace);
 +        load(keyspace);
 +        Keyspace.open(keyspace.name);
 +
 +        notifyCreateKeyspace(keyspace);
 +        keyspace.types.forEach(this::notifyCreateType);
 +        keyspace.tables.forEach(this::notifyCreateTable);
 +        keyspace.views.forEach(this::notifyCreateView);
 +        keyspace.functions.udfs().forEach(this::notifyCreateFunction);
 +        keyspace.functions.udas().forEach(this::notifyCreateAggregate);
 +        SchemaDiagnostics.keyspaceCreated(this, keyspace);
 +    }
 +
 +    private void dropKeyspace(KeyspaceMetadata keyspace)
 +    {
 +        SchemaDiagnostics.keyspaceDroping(this, keyspace);
 +        keyspace.views.forEach(this::dropView);
 +        keyspace.tables.forEach(this::dropTable);
 +
 +        // remove the keyspace from the static instances.
 +        Keyspace.clear(keyspace.name);
 +        unload(keyspace);
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        keyspace.functions.udas().forEach(this::notifyDropAggregate);
 +        keyspace.functions.udfs().forEach(this::notifyDropFunction);
 +        keyspace.views.forEach(this::notifyDropView);
 +        keyspace.tables.forEach(this::notifyDropTable);
 +        keyspace.types.forEach(this::notifyDropType);
 +        notifyDropKeyspace(keyspace);
 +        SchemaDiagnostics.keyspaceDroped(this, keyspace);
 +    }
 +
 +    private void dropView(ViewMetadata metadata)
 +    {
 +        
Keyspace.open(metadata.keyspace()).viewManager.dropView(metadata.name());
 +        dropTable(metadata.metadata);
 +    }
 +
 +    private void dropTable(TableMetadata metadata)
 +    {
 +        SchemaDiagnostics.tableDropping(this, metadata);
 +        ColumnFamilyStore cfs = 
Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
 +        assert cfs != null;
 +        // make sure all the indexes are dropped, or else.
 +        cfs.indexManager.markAllIndexesRemoved();
 +        
CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata),
 (sstable) -> true, true);
 +        if (DatabaseDescriptor.isAutoSnapshot())
 +            
cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, 
ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
 +        
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
 +        Keyspace.open(metadata.keyspace).dropCf(metadata.id);
 +        SchemaDiagnostics.tableDropped(this, metadata);
 +    }
 +
 +    private void createTable(TableMetadata table)
 +    {
 +        SchemaDiagnostics.tableCreating(this, table);
 +        Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), 
true);
 +        SchemaDiagnostics.tableCreated(this, table);
 +    }
 +
 +    private void createView(ViewMetadata view)
 +    {
 +        
Keyspace.open(view.keyspace()).initCf(metadataRefs.get(view.metadata.id), true);
 +    }
 +
 +    private void alterTable(TableMetadata updated)
 +    {
 +        SchemaDiagnostics.tableAltering(this, updated);
 +        
Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
 +        SchemaDiagnostics.tableAltered(this, updated);
 +    }
 +
 +    private void alterView(ViewMetadata updated)
 +    {
 +        
Keyspace.open(updated.keyspace()).getColumnFamilyStore(updated.name()).reload();
 +    }
 +
 +    private void notifyCreateKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onCreateKeyspace(ksm.name));
 +    }
 +
 +    private void notifyCreateTable(TableMetadata metadata)
 +    {
 +        changeListeners.forEach(l -> l.onCreateTable(metadata.keyspace, 
metadata.name));
 +    }
 +
 +    private void notifyCreateView(ViewMetadata view)
 +    {
 +        changeListeners.forEach(l -> l.onCreateView(view.keyspace(), 
view.name()));
 +    }
 +
 +    private void notifyCreateType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onCreateType(ut.keyspace, 
ut.getNameAsString()));
 +    }
 +
 +    private void notifyCreateFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onCreateFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyCreateAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyAlterKeyspace(KeyspaceMetadata before, 
KeyspaceMetadata after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterKeyspace(after.name));
 +    }
 +
 +    private void notifyAlterTable(TableMetadata before, TableMetadata after)
 +    {
 +        boolean changeAffectedPreparedStatements = 
before.changeAffectsPreparedStatements(after);
 +        changeListeners.forEach(l -> l.onAlterTable(after.keyspace, 
after.name, changeAffectedPreparedStatements));
 +    }
 +
 +    private void notifyAlterView(ViewMetadata before, ViewMetadata after)
 +    {
 +        boolean changeAffectedPreparedStatements = 
before.metadata.changeAffectsPreparedStatements(after.metadata);
 +        changeListeners.forEach(l ->l.onAlterView(after.keyspace(), 
after.name(), changeAffectedPreparedStatements));
 +    }
 +
 +    private void notifyAlterType(UserType before, UserType after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterType(after.keyspace, 
after.getNameAsString()));
 +    }
 +
 +    private void notifyAlterFunction(UDFunction before, UDFunction after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterFunction(after.name().keyspace, 
after.name().name, after.argTypes()));
 +    }
 +
 +    private void notifyAlterAggregate(UDAggregate before, UDAggregate after)
 +    {
 +        changeListeners.forEach(l -> 
l.onAlterAggregate(after.name().keyspace, after.name().name, after.argTypes()));
 +    }
 +
 +    private void notifyDropKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onDropKeyspace(ksm.name));
 +    }
 +
 +    private void notifyDropTable(TableMetadata metadata)
 +    {
 +        changeListeners.forEach(l -> l.onDropTable(metadata.keyspace, 
metadata.name));
 +    }
 +
 +    private void notifyDropView(ViewMetadata view)
 +    {
 +        changeListeners.forEach(l -> l.onDropView(view.keyspace(), 
view.name()));
 +    }
 +
 +    private void notifyDropType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onDropType(ut.keyspace, 
ut.getNameAsString()));
 +    }
 +
 +    private void notifyDropFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onDropFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyDropAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
 +    }
 +
 +
 +    /**
 +     * Converts the given schema version to a string. Returns {@code 
unknown}, if {@code version} is {@code null}
 +     * or {@code "(empty)"}, if {@code version} refers to an {@link 
SchemaConstants#emptyVersion empty) schema.
 +     */
 +    public static String schemaVersionToString(UUID version)
 +    {
 +        return version == null
 +               ? "unknown"
 +               : SchemaConstants.emptyVersion.equals(version)
 +                 ? "(empty)"
 +                 : version.toString();
 +    }
 +}
diff --cc src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java
index 0e6b7a3,0000000..a984804
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java
+++ b/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java
@@@ -1,114 -1,0 +1,111 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.schema;
 +
 +import java.io.Serializable;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Queue;
 +import java.util.UUID;
 +import java.util.concurrent.CountDownLatch;
 +
 +import javax.annotation.Nullable;
 +
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.diag.DiagnosticEvent;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.MessagingService;
 +
 +/**
 + * Internal events emitted by {@link MigrationManager}.
 + */
 +final class SchemaMigrationEvent extends DiagnosticEvent
 +{
 +    private final MigrationManagerEventType type;
 +    @Nullable
 +    private final InetAddressAndPort endpoint;
 +    @Nullable
 +    private final UUID endpointSchemaVersion;
 +    private final UUID localSchemaVersion;
 +    private final Integer localMessagingVersion;
 +    private final SystemKeyspace.BootstrapState bootstrapState;
-     @Nullable
-     private Integer inflightTaskCount;
++    private final Integer inflightTaskCount;
 +    @Nullable
 +    private Integer endpointMessagingVersion;
 +    @Nullable
 +    private Boolean endpointGossipOnlyMember;
 +    @Nullable
 +    private Boolean isAlive;
 +
 +    enum MigrationManagerEventType
 +    {
 +        UNKNOWN_LOCAL_SCHEMA_VERSION,
 +        VERSION_MATCH,
 +        SKIP_PULL,
 +        RESET_LOCAL_SCHEMA,
 +        TASK_CREATED,
 +        TASK_SEND_ABORTED,
 +        TASK_REQUEST_SEND
 +    }
 +
 +    SchemaMigrationEvent(MigrationManagerEventType type,
 +                         @Nullable InetAddressAndPort endpoint, @Nullable 
UUID endpointSchemaVersion)
 +    {
 +        this.type = type;
 +        this.endpoint = endpoint;
 +        this.endpointSchemaVersion = endpointSchemaVersion;
 +
 +        localSchemaVersion = Schema.instance.getVersion();
 +        localMessagingVersion = MessagingService.current_version;
 +
-         Queue<CountDownLatch> inflightTasks = 
MigrationTask.getInflightTasks();
-         if (inflightTasks != null)
-             inflightTaskCount = inflightTasks.size();
++        inflightTaskCount = MigrationCoordinator.instance.getInflightTasks();
 +
 +        this.bootstrapState = SystemKeyspace.getBootstrapState();
 +
 +        if (endpoint == null) return;
 +
 +        if (MessagingService.instance().versions.knows(endpoint))
 +            endpointMessagingVersion = 
MessagingService.instance().versions.getRaw(endpoint);
 +
 +        endpointGossipOnlyMember = 
Gossiper.instance.isGossipOnlyMember(endpoint);
 +        this.isAlive = FailureDetector.instance.isAlive(endpoint);
 +    }
 +
 +    public Enum<?> getType()
 +    {
 +        return type;
 +    }
 +
 +    public Map<String, Serializable> toMap()
 +    {
 +        HashMap<String, Serializable> ret = new HashMap<>();
 +        if (endpoint != null) ret.put("endpoint", 
endpoint.getHostAddressAndPort());
 +        ret.put("endpointSchemaVersion", 
Schema.schemaVersionToString(endpointSchemaVersion));
 +        ret.put("localSchemaVersion", 
Schema.schemaVersionToString(localSchemaVersion));
 +        if (endpointMessagingVersion != null) 
ret.put("endpointMessagingVersion", endpointMessagingVersion);
 +        if (localMessagingVersion != null) ret.put("localMessagingVersion", 
localMessagingVersion);
 +        if (endpointGossipOnlyMember != null) 
ret.put("endpointGossipOnlyMember", endpointGossipOnlyMember);
 +        if (isAlive != null) ret.put("endpointIsAlive", isAlive);
 +        if (bootstrapState != null) ret.put("bootstrapState", 
bootstrapState.name());
 +        if (inflightTaskCount != null) ret.put("inflightTaskCount", 
inflightTaskCount);
 +        return ret;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index ca2bff2,734f176..4a3477c
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -30,13 -29,10 +30,14 @@@ import java.util.concurrent.atomic.Atom
  import java.util.regex.MatchResult;
  import java.util.regex.Pattern;
  import java.util.stream.Collectors;
 +import java.util.stream.Stream;
  import java.util.stream.StreamSupport;
  
++import java.util.stream.Collectors;
  import javax.annotation.Nullable;
  import javax.management.*;
 +import javax.management.openmbean.CompositeData;
 +import javax.management.openmbean.OpenDataException;
  import javax.management.openmbean.TabularData;
  import javax.management.openmbean.TabularDataSupport;
  
@@@ -91,17 -82,16 +92,18 @@@ import org.apache.cassandra.repair.*
  import org.apache.cassandra.repair.messages.RepairOption;
  import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
  import org.apache.cassandra.schema.KeyspaceMetadata;
 -import org.apache.cassandra.schema.SchemaKeyspace;
 -import org.apache.cassandra.service.paxos.CommitVerbHandler;
 -import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 -import org.apache.cassandra.service.paxos.ProposeVerbHandler;
++import org.apache.cassandra.schema.MigrationCoordinator;
 +import org.apache.cassandra.schema.MigrationManager;
 +import org.apache.cassandra.schema.ReplicationParams;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableMetadataRef;
 +import org.apache.cassandra.schema.ViewMetadata;
  import org.apache.cassandra.streaming.*;
 -import org.apache.cassandra.thrift.EndpointDetails;
 -import org.apache.cassandra.thrift.TokenRange;
 -import org.apache.cassandra.thrift.cassandraConstants;
  import org.apache.cassandra.tracing.TraceKeyspace;
  import org.apache.cassandra.transport.ProtocolVersion;
 +import org.apache.cassandra.transport.Server;
  import org.apache.cassandra.utils.*;
  import org.apache.cassandra.utils.logging.LoggingSupportFactory;
  import org.apache.cassandra.utils.progress.ProgressEvent;
@@@ -135,8 -116,10 +137,11 @@@ public class StorageService extends Not
  {
      private static final Logger logger = 
LoggerFactory.getLogger(StorageService.class);
  
 +    public static final int INDEFINITE = -1;
      public static final int RING_DELAY = getRingDelay(); // delay after which 
we assume ring has stablized
+     public static final int SCHEMA_DELAY = getRingDelay(); // delay after 
which we assume ring has stablized
+ 
+     private static final boolean REQUIRE_SCHEMAS = 
!Boolean.getBoolean("cassandra.skip_schema_check");
  
      private final JMXProgressSupport progressSupport = new 
JMXProgressSupport(this);
  
@@@ -149,11 -141,23 +154,25 @@@
              return Integer.parseInt(newdelay);
          }
          else
 +        {
              return 30 * 1000;
 +        }
      }
  
+     private static int getSchemaDelay()
+     {
+         String newdelay = System.getProperty("cassandra.schema_delay_ms");
+         if (newdelay != null)
+         {
+             logger.info("Overriding SCHEMA_DELAY to {}ms", newdelay);
+             return Integer.parseInt(newdelay);
+         }
+         else
+         {
+             return 30 * 1000;
+         }
+     }
+ 
      /* This abstraction maintains the token/endpoint metadata information */
      private TokenMetadata tokenMetadata = new TokenMetadata();
  
@@@ -774,11 -824,13 +793,12 @@@
  
      public static boolean isSeed()
      {
 -        return 
DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress());
 +        return 
DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort());
      }
  
 -    @VisibleForTesting
 -    public void prepareToJoin() throws ConfigurationException
 +    private void prepareToJoin() throws ConfigurationException
      {
+         MigrationCoordinator.instance.start();
          if (!joined)
          {
              Map<ApplicationState, VersionedValue> appStates = new 
EnumMap<>(ApplicationState.class);
@@@ -880,26 -932,23 +900,31 @@@
              }
              Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
          }
-         // if our schema hasn't matched yet, wait until it has
-         // we do this by waiting for all in-flight migration requests and 
responses to complete
-         // (post CASSANDRA-1391 we don't expect this to be necessary very 
often, but it doesn't hurt to be careful)
-         if (!MigrationManager.isReadyForBootstrap())
-         {
-             setMode(Mode.JOINING, "waiting for schema information to 
complete", true);
-             MigrationManager.waitUntilReadyForBootstrap();
-         }
+ 
+         boolean schemasReceived = 
MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(SCHEMA_DELAY));
+ 
+         if (schemasReceived)
+             return;
+ 
+         logger.warn(String.format("There are nodes in the cluster with a 
different schema version than us we did not merged schemas from, " +
+                                   "our version : (%s), outstanding versions 
-> endpoints : %s",
+                                   Schema.instance.getVersion(),
+                                   
MigrationCoordinator.instance.outstandingVersions()));
+ 
+         if (REQUIRE_SCHEMAS)
+             throw new RuntimeException("Didn't receive schemas for all known 
versions within the timeout");
      }
  
 +    private void joinTokenRing(long schemaTimeoutMillis) throws 
ConfigurationException
 +    {
 +        joinTokenRing(!isSurveyMode, shouldBootstrap(), schemaTimeoutMillis, 
INDEFINITE);
 +    }
 +
      @VisibleForTesting
 -    private void joinTokenRing(int delay) throws ConfigurationException
 +    public void joinTokenRing(boolean finishJoiningRing,
 +                              boolean shouldBootstrap,
 +                              long schemaTimeoutMillis,
 +                              long bootstrapTimeoutMillis) throws 
ConfigurationException
      {
          joined = true;
  
@@@ -2233,11 -2104,11 +2258,11 @@@
                          }
                          break;
                      case SCHEMA:
 -                        SystemKeyspace.updatePeerInfo(endpoint, 
"schema_version", UUID.fromString(value.value), executor);
 +                        SystemKeyspace.updatePeerInfo(endpoint, 
"schema_version", UUID.fromString(value.value));
-                         
MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
+                         
MigrationCoordinator.instance.reportEndpointVersion(endpoint, 
UUID.fromString(value.value));
                          break;
                      case HOST_ID:
 -                        SystemKeyspace.updatePeerInfo(endpoint, "host_id", 
UUID.fromString(value.value), executor);
 +                        SystemKeyspace.updatePeerInfo(endpoint, "host_id", 
UUID.fromString(value.value));
                          break;
                      case RPC_READY:
                          notifyRpcChange(endpoint, epState.isRpcReady());
@@@ -3126,13 -2866,11 +3151,11 @@@
          {
              onChange(endpoint, entry.getKey(), entry.getValue());
          }
-         MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
+         MigrationCoordinator.instance.reportEndpointVersion(endpoint, 
epState);
      }
  
 -    public void onAlive(InetAddress endpoint, EndpointState state)
 +    public void onAlive(InetAddressAndPort endpoint, EndpointState state)
      {
-         MigrationManager.instance.scheduleSchemaPull(endpoint, state);
- 
          if (tokenMetadata.isMember(endpoint))
              notifyUp(endpoint);
      }
@@@ -5683,42 -5381,9 +5706,58 @@@
      }
  
      @Override
 +    public void enableFullQueryLogger(String path, String rollCycle, Boolean 
blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int 
maxArchiveRetries)
 +    {
 +        FullQueryLoggerOptions fqlOptions = 
DatabaseDescriptor.getFullQueryLogOptions();
 +        path = path != null ? path : fqlOptions.log_dir;
 +        rollCycle = rollCycle != null ? rollCycle : fqlOptions.roll_cycle;
 +        blocking = blocking != null ? blocking : fqlOptions.block;
 +        maxQueueWeight = maxQueueWeight != Integer.MIN_VALUE ? maxQueueWeight 
: fqlOptions.max_queue_weight;
 +        maxLogSize = maxLogSize != Long.MIN_VALUE ? maxLogSize : 
fqlOptions.max_log_size;
 +        archiveCommand = archiveCommand != null ? archiveCommand : 
fqlOptions.archive_command;
 +        maxArchiveRetries = maxArchiveRetries != Integer.MIN_VALUE ? 
maxArchiveRetries : fqlOptions.max_archive_retries;
 +
 +        Preconditions.checkNotNull(path, "cassandra.yaml did not set log_dir 
and not set as parameter");
 +        FullQueryLogger.instance.enable(Paths.get(path), rollCycle, blocking, 
maxQueueWeight, maxLogSize, archiveCommand, maxArchiveRetries);
 +    }
 +
 +    @Override
 +    public void resetFullQueryLogger()
 +    {
 +        
FullQueryLogger.instance.reset(DatabaseDescriptor.getFullQueryLogOptions().log_dir);
 +    }
 +
 +    @Override
 +    public void stopFullQueryLogger()
 +    {
 +        FullQueryLogger.instance.stop();
 +    }
 +
 +    @Override
 +    public boolean isFullQueryLogEnabled()
 +    {
 +        return FullQueryLogger.instance.isEnabled();
 +    }
 +
 +    @Override
 +    public CompositeData getFullQueryLoggerOptions()
 +    {
 +        return 
FullQueryLoggerOptionsCompositeData.toCompositeData(FullQueryLogger.instance.getFullQueryLoggerOptions());
 +    }
++
++    @Override
+     public Map<String, Set<InetAddress>> getOutstandingSchemaVersions()
+     {
 -        Map<UUID, Set<InetAddress>> outstanding = 
MigrationCoordinator.instance.outstandingVersions();
 -        return outstanding.entrySet().stream().collect(Collectors.toMap(e -> 
e.getKey().toString(), Entry::getValue));
++        Map<UUID, Set<InetAddressAndPort>> outstanding = 
MigrationCoordinator.instance.outstandingVersions();
++        return outstanding.entrySet().stream().collect(Collectors.toMap(e -> 
e.getKey().toString(),
++                                                                        e -> 
e.getValue().stream().map(i -> i.address).collect(Collectors.toSet())));
++    }
++
++    @Override
++    public Map<String, Set<String>> getOutstandingSchemaVersionsWithPort()
++    {
++        Map<UUID, Set<InetAddressAndPort>> outstanding = 
MigrationCoordinator.instance.outstandingVersions();
++        return outstanding.entrySet().stream().collect(Collectors.toMap(e -> 
e.getKey().toString(),
++                                                                        e -> 
e.getValue().stream().map(InetAddressAndPort::toString).collect(Collectors.toSet())));
+     }
  }
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 618a37e,218cdd9..0c4b5b0
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -22,19 -21,16 +22,21 @@@ import java.io.Serializable
  import java.net.InetAddress;
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
 -import java.util.Collection;
  import java.util.List;
  import java.util.Map;
+ import java.util.Set;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.TimeoutException;
 -
 +import javax.annotation.Nullable;
  import javax.management.NotificationEmitter;
 +import javax.management.openmbean.CompositeData;
 +import javax.management.openmbean.OpenDataException;
  import javax.management.openmbean.TabularData;
  
 +import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 +import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.InetAddressAndPort;
 +
  public interface StorageServiceMBean extends NotificationEmitter
  {
      /**
@@@ -750,73 -707,9 +752,78 @@@
       */
      public boolean resumeBootstrap();
  
 -    /** Returns the max version that this node will negotiate for native 
protocol connections */
 -    public int getMaxNativeProtocolVersion();
 +    /** Gets the concurrency settings for processing stages*/
 +    static class StageConcurrency implements Serializable
 +    {
 +        public final int corePoolSize;
 +        public final int maximumPoolSize;
 +
 +        public StageConcurrency(int corePoolSize, int maximumPoolSize)
 +        {
 +            this.corePoolSize = corePoolSize;
 +            this.maximumPoolSize = maximumPoolSize;
 +        }
 +
 +    }
 +    public Map<String, List<Integer>> getConcurrency(List<String> stageNames);
 +
 +    /** Sets the concurrency setting for processing stages */
 +    public void setConcurrency(String threadPoolName, int newCorePoolSize, 
int newMaximumPoolSize);
 +
 +    /** Clears the history of clients that have connected in the past **/
 +    void clearConnectionHistory();
 +    public void disableAuditLog();
 +    public void enableAuditLog(String loggerName, Map<String, String> 
parameters, String includedKeyspaces, String excludedKeyspaces, String 
includedCategories, String excludedCategories, String includedUsers, String 
excludedUsers) throws ConfigurationException;
 +    public void enableAuditLog(String loggerName, String includedKeyspaces, 
String excludedKeyspaces, String includedCategories, String excludedCategories, 
String includedUsers, String excludedUsers) throws ConfigurationException;
 +    public boolean isAuditLogEnabled();
 +    public String getCorruptedTombstoneStrategy();
 +    public void setCorruptedTombstoneStrategy(String strategy);
 +
 +    /**
 +     * Start the fully query logger.
 +     * @param path Path where the full query log will be stored. If null 
cassandra.yaml value is used.
 +     * @param rollCycle How often to create a new file for query data 
(MINUTELY, DAILY, HOURLY)
 +     * @param blocking Whether threads submitting queries to the query log 
should block if they can't be drained to the filesystem or alternatively drops 
samples and log
 +     * @param maxQueueWeight How many bytes of query data to queue before 
blocking or dropping samples
 +     * @param maxLogSize How many bytes of log data to store before dropping 
segments. Might not be respected if a log file hasn't rolled so it can be 
deleted.
 +     * @param archiveCommand executable archiving the rolled log files,
 +     * @param maxArchiveRetries max number of times to retry a failing 
archive command
 +     *
 +     */
 +    public void enableFullQueryLogger(String path, String rollCycle, Boolean 
blocking, int maxQueueWeight, long maxLogSize, @Nullable String archiveCommand, 
int maxArchiveRetries);
 +
 +    /**
 +     * Disable the full query logger if it is enabled.
 +     * Also delete any generated files in the last used full query log path 
as well as the one configure in cassandra.yaml
 +     */
 +    public void resetFullQueryLogger();
 +
 +    /**
 +     * Stop logging queries but leave any generated files on disk.
 +     */
 +    public void stopFullQueryLogger();
 +
 +    public boolean isFullQueryLogEnabled();
 +
 +    /**
 +     * Returns the current state of FQL.
 +     */
 +    CompositeData getFullQueryLoggerOptions();
 +
 +    /** Sets the initial allocation size of backing arrays for new 
RangeTombstoneList objects */
 +    public void setInitialRangeTombstoneListAllocationSize(int size);
 +
 +    /** Returns the initial allocation size of backing arrays for new 
RangeTombstoneList objects */
 +    public int getInitialRangeTombstoneListAllocationSize();
 +
 +    /** Sets the resize factor to use when growing/resizing a 
RangeTombstoneList */
 +    public void setRangeTombstoneListResizeGrowthFactor(double growthFactor);
 +
 +    /** Returns the resize factor to use when growing/resizing a 
RangeTombstoneList */
 +    public double getRangeTombstoneResizeListGrowthFactor();
+ 
+     /** Returns a map of schema version -> list of endpoints reporting that 
version that we need schema updates for */
++    @Deprecated
+     public Map<String, Set<InetAddress>> getOutstandingSchemaVersions();
++    public Map<String, Set<String>> getOutstandingSchemaVersionsWithPort();
  }
diff --cc 
test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index 70ef503,0000000..6dda98e
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@@ -1,457 -1,0 +1,461 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.action;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.net.InetSocketAddress;
 +import java.time.Duration;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Supplier;
 +
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.api.IInstance;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.distributed.shared.VersionedApplicationState;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.schema.MigrationCoordinator;
 +import org.apache.cassandra.schema.MigrationManager;
 +import org.apache.cassandra.service.PendingRangeCalculatorService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static 
org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
 +
 +public class GossipHelper
 +{
 +    public static InstanceAction statusToBootstrap(IInvokableInstance newNode)
 +    {
 +        return (instance) ->
 +        {
 +            changeGossipState(instance,
 +                              newNode,
 +                              Arrays.asList(tokens(newNode),
 +                                            statusBootstrapping(newNode),
 +                                            
statusWithPortBootstrapping(newNode)));
 +        };
 +    }
 +
 +    public static InstanceAction statusToNormal(IInvokableInstance peer)
 +    {
 +        return (target) ->
 +        {
 +            changeGossipState(target,
 +                              peer,
 +                              Arrays.asList(tokens(peer),
 +                                            statusNormal(peer),
 +                                            releaseVersion(peer),
 +                                            netVersion(peer),
 +                                            statusWithPortNormal(peer)));
 +        };
 +    }
 +
 +    /**
 +     * This method is unsafe and should be used _only_ when gossip is not 
used or available: it creates versioned values on the
 +     * target instance, which means Gossip versioning gets out of sync. Use a 
safe couterpart at all times when performing _any_
 +     * ring movement operations _or_ if Gossip is used.
 +     */
 +    public static void unsafeStatusToNormal(IInvokableInstance target, 
IInstance peer)
 +    {
 +        int messagingVersion = peer.getMessagingVersion();
 +        changeGossipState(target,
 +                          peer,
 +                          Arrays.asList(unsafeVersionedValue(target,
 +                                                             
ApplicationState.TOKENS,
 +                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens),
 +                                                             
peer.config().getString("partitioner"),
 +                                                             
peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             
ApplicationState.STATUS,
 +                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
 +                                                             
peer.config().getString("partitioner"),
 +                                                             
peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             
ApplicationState.STATUS_WITH_PORT,
 +                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
 +                                                             
peer.config().getString("partitioner"),
 +                                                             
peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             
ApplicationState.NET_VERSION,
 +                                                             (partitioner) -> 
new 
VersionedValue.VersionedValueFactory(partitioner).networkVersion(messagingVersion),
 +                                                             
peer.config().getString("partitioner")),
 +                                        unsafeReleaseVersion(target,
 +                                                             
peer.config().getString("partitioner"),
 +                                                             
peer.getReleaseVersionString())));
 +    }
 +
 +    public static InstanceAction statusToLeaving(IInvokableInstance newNode)
 +    {
 +        return (instance) -> {
 +            changeGossipState(instance,
 +                              newNode,
 +                              Arrays.asList(tokens(newNode),
 +                                            statusLeaving(newNode),
 +                                            statusWithPortLeaving(newNode)));
 +        };
 +    }
 +
 +    public static InstanceAction bootstrap()
 +    {
 +        return new BootstrapAction();
 +    }
 +
 +    public static InstanceAction bootstrap(boolean joinRing, Duration 
waitForBootstrap, Duration waitForSchema)
 +    {
 +        return new BootstrapAction(joinRing, waitForBootstrap, waitForSchema);
 +    }
 +
 +    public static InstanceAction disseminateGossipState(IInvokableInstance 
newNode)
 +    {
 +        return new DisseminateGossipState(newNode);
 +    }
 +
 +    public static InstanceAction pullSchemaFrom(IInvokableInstance pullFrom)
 +    {
 +        return new PullSchemaFrom(pullFrom);
 +    }
 +
 +    private static InstanceAction disableBinary()
 +    {
 +        return (instance) -> 
instance.nodetoolResult("disablebinary").asserts().success();
 +    }
 +
 +    private static class DisseminateGossipState implements InstanceAction
 +    {
 +        final Map<InetSocketAddress, byte[]> gossipState;
 +
 +        public DisseminateGossipState(IInvokableInstance... from)
 +        {
 +            gossipState = new HashMap<>();
 +            for (IInvokableInstance node : from)
 +            {
 +                byte[] epBytes = node.callsOnInstance(() -> {
 +                    EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort());
 +                    return toBytes(epState);
 +                }).call();
 +                gossipState.put(node.broadcastAddress(), epBytes);
 +            }
 +        }
 +
 +        public void accept(IInvokableInstance instance)
 +        {
 +            
instance.appliesOnInstance((IIsolatedExecutor.SerializableFunction<Map<InetSocketAddress,
 byte[]>, Void>)
 +                                       (map) -> {
 +                                           Map<InetAddressAndPort, 
EndpointState> newState = new HashMap<>();
 +                                           for (Map.Entry<InetSocketAddress, 
byte[]> e : map.entrySet())
 +                                               
newState.put(toCassandraInetAddressAndPort(e.getKey()), 
fromBytes(e.getValue()));
 +
 +                                           
Gossiper.runInGossipStageBlocking(() -> {
 +                                               
Gossiper.instance.applyStateLocally(newState);
 +                                           });
 +                                           return null;
 +                                       }).apply(gossipState);
 +        }
 +    }
 +
 +    private static byte[] toBytes(EndpointState epState)
 +    {
 +        try (DataOutputBuffer out = new DataOutputBuffer(1024))
 +        {
 +            EndpointState.serializer.serialize(epState, out, 
MessagingService.current_version);
 +            return out.toByteArray();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private static EndpointState fromBytes(byte[] bytes)
 +    {
 +        try (DataInputBuffer in = new DataInputBuffer(bytes))
 +        {
 +            return EndpointState.serializer.deserialize(in, 
MessagingService.current_version);
 +        }
 +        catch (Throwable t)
 +        {
 +            throw new RuntimeException(t);
 +        }
 +    }
 +
 +    private static class PullSchemaFrom implements InstanceAction
 +    {
 +        final InetSocketAddress pullFrom;
 +
 +        public PullSchemaFrom(IInvokableInstance pullFrom)
 +        {
 +            this.pullFrom = pullFrom.broadcastAddress();;
 +        }
 +
 +        public void accept(IInvokableInstance pullTo)
 +        {
 +            pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> {
 +                InetAddressAndPort endpoint = 
toCassandraInetAddressAndPort(pullFrom);
 +                EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-                 MigrationManager.scheduleSchemaPull(endpoint, state);
-                 MigrationManager.waitUntilReadyForBootstrap();
++                MigrationCoordinator.instance.reportEndpointVersion(endpoint, 
state);
++                
MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10));
 +            }).accept(pullFrom);
 +        }
 +    }
 +
 +    private static class BootstrapAction implements InstanceAction, 
Serializable
 +    {
 +        private final boolean joinRing;
 +        private final Duration waitForBootstrap;
 +        private final Duration waitForSchema;
 +
 +        public BootstrapAction()
 +        {
 +            this(true, Duration.ofMinutes(10), Duration.ofSeconds(10));
 +        }
 +
 +        public BootstrapAction(boolean joinRing, Duration waitForBootstrap, 
Duration waitForSchema)
 +        {
 +            this.joinRing = joinRing;
 +            this.waitForBootstrap = waitForBootstrap;
 +            this.waitForSchema = waitForSchema;
 +        }
 +
 +        public void accept(IInvokableInstance instance)
 +        {
 +            instance.appliesOnInstance((String partitionerString, String 
tokenString) -> {
 +                IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
 +                List<Token> tokens = 
Collections.singletonList(partitioner.getTokenFactory().fromString(tokenString));
 +                try
 +                {
 +                    Collection<InetAddressAndPort> collisions = 
StorageService.instance.prepareForBootstrap(waitForSchema.toMillis());
 +                    assert collisions.size() == 0 : String.format("Didn't 
expect any replacements but got %s", collisions);
 +                    boolean isBootstrapSuccessful = 
StorageService.instance.bootstrap(tokens, waitForBootstrap.toMillis());
 +                    assert isBootstrapSuccessful : "Bootstrap did not 
complete successfully";
 +                    StorageService.instance.setUpDistributedSystemKeyspaces();
 +                    if (joinRing)
 +                        StorageService.instance.finishJoiningRing(true, 
tokens);
 +                }
 +                catch (Throwable t)
 +                {
 +                    throw new RuntimeException(t);
 +                }
 +
 +                return null;
 +            }).apply(instance.config().getString("partitioner"), 
instance.config().getString("initial_token"));
 +        }
 +    }
 +
 +    public static InstanceAction decomission()
 +    {
 +        return (target) -> 
target.nodetoolResult("decommission").asserts().success();
 +    }
 +
 +
 +    public static VersionedApplicationState tokens(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.TOKENS, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).tokens(tokens));
 +    }
 +
 +    public static VersionedApplicationState netVersion(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.NET_VERSION, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).networkVersion());
 +    }
 +
 +    private static VersionedApplicationState 
unsafeReleaseVersion(IInvokableInstance instance, String partitionerStr, String 
releaseVersionStr)
 +    {
 +        return unsafeVersionedValue(instance, 
ApplicationState.RELEASE_VERSION, (partitioner) -> new 
VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersionStr),
 partitionerStr);
 +    }
 +
 +    public static VersionedApplicationState releaseVersion(IInvokableInstance 
instance)
 +    {
 +        return unsafeReleaseVersion(instance, 
instance.config().getString("partitioner"), instance.getReleaseVersionString());
 +    }
 +
 +    public static VersionedApplicationState statusNormal(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
 +    }
 +
 +    public static VersionedApplicationState 
statusWithPortNormal(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
 +    }
 +
 +    public static VersionedApplicationState 
statusBootstrapping(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
 +    }
 +
 +    public static VersionedApplicationState 
statusWithPortBootstrapping(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusLeaving(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusLeft(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, 
(partitioner, tokens) -> {
 +            return new 
VersionedValue.VersionedValueFactory(partitioner).left(tokens, 
System.currentTimeMillis() + Gossiper.aVeryLongTime);
 +        });
 +    }
 +
 +    public static VersionedApplicationState 
statusWithPortLeft(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> {
 +            return new 
VersionedValue.VersionedValueFactory(partitioner).left(tokens, 
System.currentTimeMillis() + Gossiper.aVeryLongTime);
 +
 +        });
 +    }
 +
 +    public static VersionedApplicationState 
statusWithPortLeaving(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
 +    }
 +
 +    public static VersionedValue toVersionedValue(VersionedApplicationState 
vv)
 +    {
 +        return VersionedValue.unsafeMakeVersionedValue(vv.value, vv.version);
 +    }
 +
 +    public static ApplicationState 
toApplicationState(VersionedApplicationState vv)
 +    {
 +        return ApplicationState.values()[vv.applicationState];
 +    }
 +
 +    private static VersionedApplicationState 
unsafeVersionedValue(IInvokableInstance instance,
 +                                                                 
ApplicationState applicationState,
 +                                                                 
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, 
VersionedValue> supplier,
 +                                                                 String 
partitionerStr, String initialTokenStr)
 +    {
 +        return instance.appliesOnInstance((String partitionerString, String 
tokenString) -> {
 +            IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
 +            Token token = 
partitioner.getTokenFactory().fromString(tokenString);
 +
 +            VersionedValue versionedValue = supplier.apply(partitioner, 
Collections.singleton(token));
 +            return new VersionedApplicationState(applicationState.ordinal(), 
versionedValue.value, versionedValue.version);
 +        }).apply(partitionerStr, initialTokenStr);
 +    }
 +
 +    private static VersionedApplicationState 
unsafeVersionedValue(IInvokableInstance instance,
 +                                                                 
ApplicationState applicationState,
 +                                                                 
IIsolatedExecutor.SerializableFunction<IPartitioner, VersionedValue> supplier,
 +                                                                 String 
partitionerStr)
 +    {
 +        return instance.appliesOnInstance((String partitionerString) -> {
 +            IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
 +            VersionedValue versionedValue = supplier.apply(partitioner);
 +            return new VersionedApplicationState(applicationState.ordinal(), 
versionedValue.value, versionedValue.version);
 +        }).apply(partitionerStr);
 +    }
 +
 +    public static VersionedApplicationState versionedToken(IInvokableInstance 
instance, ApplicationState applicationState, 
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, 
VersionedValue> supplier)
 +    {
 +        return unsafeVersionedValue(instance, applicationState, supplier, 
instance.config().getString("partitioner"), 
instance.config().getString("initial_token"));
 +    }
 +
 +    public static InstanceAction removeFromRing(IInvokableInstance peer)
 +    {
 +        return (target) -> {
 +            InetAddressAndPort endpoint = 
toCassandraInetAddressAndPort(peer.broadcastAddress());
 +            VersionedApplicationState newState = statusLeft(peer);
 +
 +            target.runOnInstance(() -> {
 +                // state to 'left'
 +                EndpointState currentState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                ApplicationState as = toApplicationState(newState);
 +                VersionedValue vv = toVersionedValue(newState);
 +                currentState.addApplicationState(as, vv);
 +                StorageService.instance.onChange(endpoint, as, vv);
 +
 +                // remove from gossip
 +                Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.unsafeAnulEndpoint(endpoint));
 +                SystemKeyspace.removeEndpoint(endpoint);
 +                PendingRangeCalculatorService.instance.update();
 +                PendingRangeCalculatorService.instance.blockUntilFinished();
 +            });
 +        };
 +    }
 +
 +    /**
 +     * Changes gossip state of the `peer` on `target`
 +     */
 +    public static void changeGossipState(IInvokableInstance target, IInstance 
peer, List<VersionedApplicationState> newState)
 +    {
 +        InetSocketAddress addr = peer.broadcastAddress();
 +        UUID hostId = peer.config().hostId();
 +        int netVersion = peer.getMessagingVersion();
 +        target.runOnInstance(() -> {
 +            InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
 +            StorageService storageService = StorageService.instance;
 +
 +            Gossiper.runInGossipStageBlocking(() -> {
 +                EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                if (state == null)
 +                {
 +                    Gossiper.instance.initializeNodeUnsafe(endpoint, hostId, 
netVersion, 1);
 +                    state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                    if (state.isAlive() && 
!Gossiper.instance.isDeadState(state))
 +                        Gossiper.instance.realMarkAlive(endpoint, state);
 +                }
 +
 +                for (VersionedApplicationState value : newState)
 +                {
 +                    ApplicationState as = toApplicationState(value);
 +                    VersionedValue vv = toVersionedValue(value);
 +                    state.addApplicationState(as, vv);
 +                    storageService.onChange(endpoint, as, vv);
 +                }
 +            });
 +        });
 +    }
 +
 +    public static void withProperty(String prop, boolean value, Runnable r)
 +    {
 +        String before = System.getProperty(prop);
 +        try
 +        {
 +            System.setProperty(prop, Boolean.toString(value));
 +            r.run();
 +        }
 +        finally
 +        {
-             System.setProperty(prop, before == null ? "true" : before);
++            if (before == null)
++                System.clearProperty(prop);
++            else
++                System.setProperty(prop, before);
 +        }
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
index 0000000,0000000..daba0f1
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
@@@ -1,0 -1,0 +1,319 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.schema;
++
++import java.net.UnknownHostException;
++import java.util.Collection;
++import java.util.Collections;
++import java.util.HashSet;
++import java.util.LinkedList;
++import java.util.List;
++import java.util.Queue;
++import java.util.Set;
++import java.util.UUID;
++import java.util.concurrent.Future;
++
++import com.google.common.collect.Iterables;
++import com.google.common.collect.Sets;
++import com.google.common.util.concurrent.Futures;
++import org.junit.Assert;
++import org.junit.Test;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.utils.concurrent.WaitQueue;
++
++import static com.google.common.util.concurrent.Futures.getUnchecked;
++
++public class MigrationCoordinatorTest
++{
++    private static final Logger logger = 
LoggerFactory.getLogger(MigrationCoordinatorTest.class);
++
++    private static final InetAddressAndPort EP1;
++    private static final InetAddressAndPort EP2;
++    private static final InetAddressAndPort EP3;
++
++    private static final UUID LOCAL_VERSION = UUID.randomUUID();
++    private static final UUID V1 = UUID.randomUUID();
++    private static final UUID V2 = UUID.randomUUID();
++    private static final UUID V3 = UUID.randomUUID();
++
++    static
++    {
++        try
++        {
++            EP1 = InetAddressAndPort.getByName("10.0.0.1");
++            EP2 = InetAddressAndPort.getByName("10.0.0.2");
++            EP3 = InetAddressAndPort.getByName("10.0.0.3");
++        }
++        catch (UnknownHostException e)
++        {
++            throw new AssertionError(e);
++        }
++
++        DatabaseDescriptor.daemonInitialization();
++    }
++
++    private static class InstrumentedCoordinator extends MigrationCoordinator
++    {
++
++        Queue<Callback> requests = new LinkedList<>();
++        @Override
++        protected void sendMigrationMessage(MigrationCoordinator.Callback 
callback)
++        {
++            requests.add(callback);
++        }
++
++        boolean shouldPullSchema = true;
++        @Override
++        protected boolean shouldPullSchema(UUID version)
++        {
++            return shouldPullSchema;
++        }
++
++        boolean shouldPullFromEndpoint = true;
++        @Override
++        protected boolean shouldPullFromEndpoint(InetAddressAndPort endpoint)
++        {
++            return shouldPullFromEndpoint;
++        }
++
++        boolean shouldPullImmediately = true;
++        @Override
++        protected boolean shouldPullImmediately(InetAddressAndPort endpoint, 
UUID version)
++        {
++            return shouldPullImmediately;
++        }
++
++        Set<InetAddressAndPort> deadNodes = new HashSet<>();
++        protected boolean isAlive(InetAddressAndPort endpoint)
++        {
++            return !deadNodes.contains(endpoint);
++        }
++
++        UUID localVersion = LOCAL_VERSION;
++        @Override
++        protected boolean isLocalVersion(UUID version)
++        {
++            return localVersion.equals(version);
++        }
++
++        int maxOutstandingRequests = 3;
++        @Override
++        protected int getMaxOutstandingVersionRequests()
++        {
++            return maxOutstandingRequests;
++        }
++
++        Set<InetAddressAndPort> mergedSchemasFrom = new HashSet<>();
++        @Override
++        protected void mergeSchemaFrom(InetAddressAndPort endpoint, 
Collection<Mutation> mutations)
++        {
++            mergedSchemasFrom.add(endpoint);
++        }
++    }
++
++    @Test
++    public void requestResponseCycle() throws InterruptedException
++    {
++        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
++        coordinator.maxOutstandingRequests = 1;
++
++        Assert.assertTrue(coordinator.requests.isEmpty());
++
++        // first schema report should send a migration request
++        getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
++        Assert.assertEquals(1, coordinator.requests.size());
++        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
++
++        // second should not
++        getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
++        Assert.assertEquals(1, coordinator.requests.size());
++        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
++
++        // until the first request fails, then the second endpoint should be 
contacted
++        MigrationCoordinator.Callback request1 = coordinator.requests.poll();
++        Assert.assertEquals(EP1, request1.endpoint);
++        getUnchecked(request1.fail());
++        Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty());
++        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
++
++        // ... then the second endpoint should be contacted
++        Assert.assertEquals(1, coordinator.requests.size());
++        MigrationCoordinator.Callback request2 = coordinator.requests.poll();
++        Assert.assertEquals(EP2, request2.endpoint);
++        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
++        getUnchecked(request2.response(Collections.emptyList()));
++        Assert.assertEquals(EP2, 
Iterables.getOnlyElement(coordinator.mergedSchemasFrom));
++        Assert.assertTrue(coordinator.awaitSchemaRequests(1));
++
++        // and migration tasks should not be sent out for subsequent version 
reports
++        getUnchecked(coordinator.reportEndpointVersion(EP3, V1));
++        Assert.assertTrue(coordinator.requests.isEmpty());
++
++    }
++
++    /**
++     * If we don't send a request for a version, and endpoints associated with
++     * it all change versions, we should signal anyone waiting on that version
++     */
++    @Test
++    public void versionsAreSignaledWhenDeleted()
++    {
++        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
++
++        coordinator.reportEndpointVersion(EP1, V1);
++        WaitQueue.Signal signal = 
coordinator.getVersionInfoUnsafe(V1).register();
++        Assert.assertFalse(signal.isSignalled());
++
++        coordinator.reportEndpointVersion(EP1, V2);
++        Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
++
++        Assert.assertTrue(signal.isSignalled());
++    }
++
++    private static void assertNoContact(InstrumentedCoordinator coordinator, 
InetAddressAndPort endpoint, UUID version, boolean startupShouldBeUnblocked)
++    {
++        Assert.assertTrue(coordinator.requests.isEmpty());
++        Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);
++        if (future != null)
++            getUnchecked(future);
++        Assert.assertTrue(coordinator.requests.isEmpty());
++
++        Assert.assertEquals(startupShouldBeUnblocked, 
coordinator.awaitSchemaRequests(1));
++    }
++
++    private static void assertNoContact(InstrumentedCoordinator coordinator, 
boolean startupShouldBeUnblocked)
++    {
++        assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked);
++    }
++
++    @Test
++    public void dontContactNodesWithSameSchema()
++    {
++        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
++
++        coordinator.localVersion = V1;
++        assertNoContact(coordinator, true);
++    }
++
++    @Test
++    public void dontContactIncompatibleNodes()
++    {
++        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
++
++        coordinator.shouldPullFromEndpoint = false;
++        assertNoContact(coordinator, false);
++    }
++
++    @Test
++    public void dontContactDeadNodes()
++    {
++        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
++
++        coordinator.deadNodes.add(EP1);
++        assertNoContact(coordinator, EP1, V1, false);
++    }
++
++    /**
++     * If a node has become incompativle between when the task was scheduled 
and when it
++     * was run, we should detect that and fail the task
++     */
++    @Test
++    public void testGossipRace()
++    {
++        InstrumentedCoordinator coordinator = new InstrumentedCoordinator() {
++            protected boolean shouldPullImmediately(InetAddressAndPort 
endpoint, UUID version)
++            {
++                // this is the last thing that gets called before scheduling 
the pull, so set this flag here
++                shouldPullFromEndpoint = false;
++                return super.shouldPullImmediately(endpoint, version);
++            }
++        };
++
++        Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1));
++        assertNoContact(coordinator, EP1, V1, false);
++    }
++
++    @Test
++    public void testWeKeepSendingRequests()
++    {
++        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
++
++        getUnchecked(coordinator.reportEndpointVersion(EP3, V2));
++        coordinator.requests.remove().response(Collections.emptyList());
++
++        getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
++        getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
++
++        MigrationCoordinator.Callback prev = null;
++        Set<InetAddressAndPort> EPs = Sets.newHashSet(EP1, EP2);
++        int ep1requests = 0;
++        int ep2requests = 0;
++
++        for (int i=0; i<10; i++)
++        {
++            Assert.assertEquals(String.format("%s", i), 2, 
coordinator.requests.size());
++
++            MigrationCoordinator.Callback next = 
coordinator.requests.remove();
++
++            // we should be contacting endpoints in a round robin fashion
++            Assert.assertTrue(EPs.contains(next.endpoint));
++            if (prev != null && prev.endpoint.equals(next.endpoint))
++                Assert.fail(String.format("Not expecting prev %s to be equal 
to next %s", prev.endpoint, next.endpoint));
++
++            // should send a new request
++            next.fail();
++            prev = next;
++            Assert.assertFalse(coordinator.awaitSchemaRequests(1));
++
++            Assert.assertEquals(2, coordinator.requests.size());
++        }
++        logger.info("{} -> {}", EP1, ep1requests);
++        logger.info("{} -> {}", EP2, ep2requests);
++
++        // a single success should unblock startup though
++        coordinator.requests.remove().response(Collections.emptyList());
++        Assert.assertTrue(coordinator.awaitSchemaRequests(1));
++
++    }
++
++    /**
++     * Pull unreceived schemas should detect and send requests out for any
++     * schemas that are marked unreceived and have no outstanding requests
++     */
++    @Test
++    public void pullUnreceived()
++    {
++        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
++
++        coordinator.shouldPullFromEndpoint = false;
++        assertNoContact(coordinator, false);
++
++        coordinator.shouldPullFromEndpoint = true;
++        Assert.assertEquals(0, coordinator.requests.size());
++        List<Future<Void>> futures = 
coordinator.pullUnreceivedSchemaVersions();
++        futures.forEach(Futures::getUnchecked);
++        Assert.assertEquals(1, coordinator.requests.size());
++    }
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to