Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b74ae4b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b74ae4b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b74ae4b Branch: refs/heads/trunk Commit: 8b74ae4b6490e1991603e9365b690da6f6900c10 Parents: f5e0a7c ec9ce3d Author: Joel Knighton <j...@apache.org> Authored: Wed Mar 22 13:28:14 2017 -0500 Committer: Joel Knighton <j...@apache.org> Committed: Wed Mar 22 13:29:09 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../gms/GossipDigestAckVerbHandler.java | 27 +++-- src/java/org/apache/cassandra/gms/Gossiper.java | 66 +++++++---- .../apache/cassandra/schema/MigrationTask.java | 12 +- .../cassandra/service/StorageService.java | 17 ++- test/conf/cassandra-seeds.yaml | 43 +++++++ .../apache/cassandra/gms/ShadowRoundTest.java | 116 +++++++++++++++++++ .../apache/cassandra/net/MatcherResponse.java | 24 ++-- 8 files changed, 253 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index 50710eb,177d7dc..e5992af --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -1337,12 -1352,23 +1346,24 @@@ public class Gossiper implements IFailu } /** - * Do a single 'shadow' round of gossip, where we do not modify any state - * Used when preparing to join the ring: - * * when replacing a node, to get and assume its tokens - * * when joining, to check that the local host id matches any previous id for the endpoint address + * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the + * map return value, instead of endpointStateMap. + * ++ * Used when preparing to join the ring: + * <ul> + * <li>when replacing a node, to get and assume its tokens</li> + * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li> + * </ul> + * + * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared + * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update + * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the + * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at + * the same time. + * + * @return endpoint states gathered during shadow round or empty map */ - public void doShadowRound() + public synchronized Map<InetAddress, EndpointState> doShadowRound() { buildSeedsList(); // it may be that the local address is the only entry in the seed http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/schema/MigrationTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/schema/MigrationTask.java index a785e17,0000000..73e396d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/MigrationTask.java +++ b/src/java/org/apache/cassandra/schema/MigrationTask.java @@@ -1,113 -1,0 +1,113 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.SystemKeyspace.BootstrapState; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.WrappedRunnable; + +final class MigrationTask extends WrappedRunnable +{ + private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class); + + private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>(); + + private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS); + + private final InetAddress endpoint; + + MigrationTask(InetAddress endpoint) + { + this.endpoint = endpoint; + } + + static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks() + { + return inflightTasks; + } + + public void runMayThrow() throws Exception + { ++ if (!FailureDetector.instance.isAlive(endpoint)) ++ { ++ logger.warn("Can't send schema pull request: node {} is down.", endpoint); ++ return; ++ } ++ + // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(), + // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with + // a higher major. + if (!MigrationManager.shouldPullSchemaFrom(endpoint)) + { + logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint); + return; + } + - if (!FailureDetector.instance.isAlive(endpoint)) - { - logger.debug("Can't send schema pull request: node {} is down.", endpoint); - return; - } - + MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); + + final CountDownLatch completionLatch = new CountDownLatch(1); + + IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>() + { + @Override + public void response(MessageIn<Collection<Mutation>> message) + { + try + { + Schema.instance.mergeAndAnnounceVersion(message.payload); + } + catch (ConfigurationException e) + { + logger.error("Configuration exception merging remote schema", e); + } + finally + { + completionLatch.countDown(); + } + } + + public boolean isLatencyForSnitch() + { + return false; + } + }; + + // Only save the latches if we need bootstrap or are bootstrapping + if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState())) + inflightTasks.offer(completionLatch); + + MessagingService.instance().sendRR(message, endpoint, cb); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/service/StorageService.java ----------------------------------------------------------------------