Merge branch 'cassandra-3.11' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b74ae4b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b74ae4b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b74ae4b

Branch: refs/heads/trunk
Commit: 8b74ae4b6490e1991603e9365b690da6f6900c10
Parents: f5e0a7c ec9ce3d
Author: Joel Knighton <j...@apache.org>
Authored: Wed Mar 22 13:28:14 2017 -0500
Committer: Joel Knighton <j...@apache.org>
Committed: Wed Mar 22 13:29:09 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../gms/GossipDigestAckVerbHandler.java         |  27 +++--
 src/java/org/apache/cassandra/gms/Gossiper.java |  66 +++++++----
 .../apache/cassandra/schema/MigrationTask.java  |  12 +-
 .../cassandra/service/StorageService.java       |  17 ++-
 test/conf/cassandra-seeds.yaml                  |  43 +++++++
 .../apache/cassandra/gms/ShadowRoundTest.java   | 116 +++++++++++++++++++
 .../apache/cassandra/net/MatcherResponse.java   |  24 ++--
 8 files changed, 253 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 50710eb,177d7dc..e5992af
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -1337,12 -1352,23 +1346,24 @@@ public class Gossiper implements IFailu
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Used when preparing to join the ring:
-      *      * when replacing a node, to get and assume its tokens
-      *      * when joining, to check that the local host id matches any 
previous id for the endpoint address
+      * Do a single 'shadow' round of gossip by retrieving endpoint states 
that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
++     * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any 
previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that 
shadow round must be cleared
+      * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, 
boolean, Map)}. This will update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order 
to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single 
shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
          // it may be that the local address is the only entry in the seed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/schema/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/MigrationTask.java
index a785e17,0000000..73e396d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/MigrationTask.java
+++ b/src/java/org/apache/cassandra/schema/MigrationTask.java
@@@ -1,113 -1,0 +1,113 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.net.InetAddress;
 +import java.util.Collection;
 +import java.util.EnumSet;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +import java.util.concurrent.CountDownLatch;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.net.IAsyncCallback;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +
 +final class MigrationTask extends WrappedRunnable
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(MigrationTask.class);
 +
 +    private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks 
= new ConcurrentLinkedQueue<>();
 +
 +    private static final Set<BootstrapState> monitoringBootstrapStates = 
EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
 +
 +    private final InetAddress endpoint;
 +
 +    MigrationTask(InetAddress endpoint)
 +    {
 +        this.endpoint = endpoint;
 +    }
 +
 +    static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
 +    {
 +        return inflightTasks;
 +    }
 +
 +    public void runMayThrow() throws Exception
 +    {
++        if (!FailureDetector.instance.isAlive(endpoint))
++        {
++            logger.warn("Can't send schema pull request: node {} is down.", 
endpoint);
++            return;
++        }
++
 +        // There is a chance that quite some time could have passed between 
now and the MM#maybeScheduleSchemaPull(),
 +        // potentially enough for the endpoint node to restart - which is an 
issue if it does restart upgraded, with
 +        // a higher major.
 +        if (!MigrationManager.shouldPullSchemaFrom(endpoint))
 +        {
 +            logger.info("Skipped sending a migration request: node {} has a 
higher major version now.", endpoint);
 +            return;
 +        }
 +
-         if (!FailureDetector.instance.isAlive(endpoint))
-         {
-             logger.debug("Can't send schema pull request: node {} is down.", 
endpoint);
-             return;
-         }
- 
 +        MessageOut message = new 
MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, 
MigrationManager.MigrationsSerializer.instance);
 +
 +        final CountDownLatch completionLatch = new CountDownLatch(1);
 +
 +        IAsyncCallback<Collection<Mutation>> cb = new 
IAsyncCallback<Collection<Mutation>>()
 +        {
 +            @Override
 +            public void response(MessageIn<Collection<Mutation>> message)
 +            {
 +                try
 +                {
 +                    Schema.instance.mergeAndAnnounceVersion(message.payload);
 +                }
 +                catch (ConfigurationException e)
 +                {
 +                    logger.error("Configuration exception merging remote 
schema", e);
 +                }
 +                finally
 +                {
 +                    completionLatch.countDown();
 +                }
 +            }
 +
 +            public boolean isLatencyForSnitch()
 +            {
 +                return false;
 +            }
 +        };
 +
 +        // Only save the latches if we need bootstrap or are bootstrapping
 +        if 
(monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
 +            inflightTasks.offer(completionLatch);
 +
 +        MessagingService.instance().sendRR(message, endpoint, cb);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

Reply via email to