Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d585410c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d585410c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d585410c Branch: refs/heads/trunk Commit: d585410ccb42011ca71441471d1e2949e5ddedb5 Parents: e210a05 0d4aacc Author: Jason Brown <jasedbr...@gmail.com> Authored: Thu May 17 10:35:51 2018 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Thu May 17 10:39:37 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 6 + .../org/apache/cassandra/gms/EndpointState.java | 2 +- src/java/org/apache/cassandra/gms/Gossiper.java | 15 +- .../apache/cassandra/gms/HeartBeatState.java | 4 +- .../cassandra/net/MessageDeliveryTask.java | 25 +- .../org/apache/cassandra/net/MessageIn.java | 6 + .../apache/cassandra/net/MessagingService.java | 36 ++- .../net/StartupClusterConnectivityChecker.java | 231 +++++++++++-------- .../cassandra/service/CassandraDaemon.java | 11 +- .../cassandra/net/MessageDeliveryTaskTest.java | 121 ++++++++++ .../org/apache/cassandra/net/MessageInTest.java | 66 ++++++ .../StartupClusterConnectivityCheckerTest.java | 160 +++++++------ 13 files changed, 497 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index cfe56c6,87e7c24..097db1d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -256,6 -15,8 +256,7 @@@ * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163) * Fix wildcard GROUP BY queries (CASSANDRA-14209) Merged from 3.0: + * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447) - * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121) * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418) * Fix progress stats and units in compactionstats (CASSANDRA-12244) * Better handle missing partition columns in system_schema.columns (CASSANDRA-14379) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 699148d,8b92c5a..592b96e --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -26,9 -26,8 +26,10 @@@ import java.nio.file.NoSuchFileExceptio import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; ++import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; @@@ -1487,6 -1491,6 +1488,11 @@@ public class DatabaseDescripto getTruncateRpcTimeout()); } ++ public static long getPingTimeout() ++ { ++ return TimeUnit.SECONDS.toMillis(getBlockForPeersTimeoutInSeconds()); ++ } ++ public static double getPhiConvictThreshold() { return conf.phi_convict_threshold; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/gms/EndpointState.java index 1085447,674b597..5646bf6 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@@ -50,7 -54,7 +50,7 @@@ public class EndpointStat private volatile long updateTimestamp; private volatile boolean isAlive; -- EndpointState(HeartBeatState initialHbState) ++ public EndpointState(HeartBeatState initialHbState) { this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index 4c66669,ea05525..3975187 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -32,13 -31,7 +32,13 @@@ import javax.management.ObjectName import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; ++import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Uninterruptibles; + +import org.apache.cassandra.locator.InetAddressAndPort; - import org.apache.cassandra.locator.SeedProvider; +import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -878,12 -854,12 +878,17 @@@ public class Gossiper implements IFailu return endpointStateMap.get(ep); } - public Set<Entry<InetAddressAndPort, EndpointState>> getEndpointStates() - public Set<Entry<InetAddress, EndpointState>> getEndpointStates() ++ public ImmutableSet<InetAddressAndPort> getEndpoints() + { - return endpointStateMap.entrySet(); ++ return ImmutableSet.copyOf(endpointStateMap.keySet()); ++ } ++ ++ public int getEndpointCount() + { - return endpointStateMap.entrySet(); ++ return endpointStateMap.size(); } - public UUID getHostId(InetAddress endpoint) + public UUID getHostId(InetAddressAndPort endpoint) { return getHostId(endpoint, endpointStateMap); } @@@ -1798,11 -1693,11 +1803,11 @@@ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS); int totalPolls = 0; int numOkay = 0; -- int epSize = Gossiper.instance.getEndpointStates().size(); ++ int epSize = Gossiper.instance.getEndpointCount(); while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); -- int currentSize = Gossiper.instance.getEndpointStates().size(); ++ int currentSize = Gossiper.instance.getEndpointCount(); totalPolls++; if (currentSize == epSize) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/HeartBeatState.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/gms/HeartBeatState.java index 13e1ace,13e1ace..2abd5d7 --- a/src/java/org/apache/cassandra/gms/HeartBeatState.java +++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java @@@ -27,7 -27,7 +27,7 @@@ import org.apache.cassandra.io.util.Dat /** * HeartBeat State associated with any given endpoint. */ --class HeartBeatState ++public class HeartBeatState { public static final IVersionedSerializer<HeartBeatState> serializer = new HeartBeatStateSerializer(); @@@ -39,7 -39,7 +39,7 @@@ this(gen, 0); } -- HeartBeatState(int gen, int ver) ++ public HeartBeatState(int gen, int ver) { generation = gen; version = ver; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessageDeliveryTask.java index 6e132a8,c91e9da..1b9090c --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@@ -20,7 -20,6 +20,8 @@@ package org.apache.cassandra.net import java.io.IOException; import java.util.EnumSet; ++import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Shorts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -50,23 -45,20 +51,40 @@@ public class MessageDeliveryTask implem public void run() { ++ process(); ++ } ++ ++ /** ++ * A helper function for making unit testing reasonable. ++ * ++ * @return true if the message was processed; else false. ++ */ ++ @VisibleForTesting ++ boolean process() ++ { MessagingService.Verb verb = message.verb; ++ if (verb == null) ++ { ++ logger.trace("Unknown verb {}", verb); ++ return false; ++ } ++ + MessagingService.instance().metrics.addQueueWaitTime(verb.toString(), + ApproximateTime.currentTimeMillis() - enqueueTime); + long timeTaken = message.getLifetimeInMS(); if (MessagingService.DROPPABLE_VERBS.contains(verb) && timeTaken > message.getTimeout()) { MessagingService.instance().incrementDroppedMessages(message, timeTaken); -- return; ++ return false; } IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb); if (verbHandler == null) { -- logger.trace("Unknown verb {}", verb); -- return; ++ logger.trace("No handler for verb {}", verb); ++ return false; } try @@@ -91,6 -83,6 +109,7 @@@ if (GOSSIP_VERBS.contains(message.verb)) Gossiper.instance.setLastProcessedMessageAt(message.constructionTime); ++ return true; } private void handleFailure(Throwable t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessageIn.java index 7fb866f,d06d515..1cd39f3 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@@ -146,8 -117,8 +146,14 @@@ public class MessageIn<T } serializer = (IVersionedSerializer<T2>) callback.serializer; } ++ if (payloadSize == 0 || serializer == null) ++ { ++ // if there's no deserializer for the verb, skip the payload bytes to leave ++ // the stream in a clean state (for the next message) ++ in.skipBytesFully(payloadSize); return create(from, null, parameters, verb, version, constructionTime); ++ } T2 payload = serializer.deserialize(in, version); return MessageIn.create(from, payload, parameters, verb, version, constructionTime); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index a590723,59ed8f3..f5051fb --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -254,57 -234,37 +254,67 @@@ public final class MessagingService imp return DatabaseDescriptor.getRangeRpcTimeout(); } }, - PING(), - PING, ++ PING ++ { ++ public long getTimeout() ++ { ++ return DatabaseDescriptor.getPingTimeout(); ++ } ++ }, + - // add new verbs after the existing verbs, but *before* the UNUSED verbs, since we serialize by ordinal. - // UNUSED verbs serve as padding for backwards compatability where a previous version needs to validate a verb from the future. - UNUSED_1, + // UNUSED verbs were used as padding for backward/forward compatability before 4.0, + // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries + // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion. + // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used + // for correctly adding VERBs that need to be emergency additions to 3.0/3.11. - // We can reclaim them (their id's, to be correct) in future versions, if desired, though. ++ // We can reclaim them (their id's, to be correct) in future versions, if desireed, though. UNUSED_2, UNUSED_3, UNUSED_4, UNUSED_5, ; - // remember to add new verbs at the end, since we serialize by ordinal ++ // add new verbs after the existing verbs, since we serialize by ordinal. - // This is to support a "late" choice of the verb based on the messaging service version. - // See CASSANDRA-12249 for more details. - public static Verb convertForMessagingServiceVersion(Verb verb, int version) + private final int id; + Verb() { - if (verb == PAGED_RANGE && version >= VERSION_30) - return RANGE_SLICE; + id = ordinal(); + } - return verb; + /** + * Unused, but it is an extension point for adding custom verbs + * @param id + */ + Verb(int id) + { + this.id = id; } public long getTimeout() { return DatabaseDescriptor.getRpcTimeout(); } - } - public static final Verb[] verbValues = Verb.values(); + public int getId() + { + return id; + } + private static final IntObjectMap<Verb> idToVerbMap = new IntObjectOpenHashMap<>(values().length); + static + { + for (Verb v : values()) + { - if (idToVerbMap.containsKey(v.getId())) - throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + v.getId()); - idToVerbMap.put(v.getId(), v); ++ Verb existing = idToVerbMap.put(v.getId(), v); ++ if (existing != null) ++ throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + existing); + } + } + + public static Verb fromId(int id) + { + return idToVerbMap.get(id); + } + } public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class) {{ @@@ -350,9 -310,8 +360,10 @@@ put(Verb.SNAPSHOT, Stage.MISC); put(Verb.ECHO, Stage.GOSSIP); - put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE); ++ put(Verb.UNUSED_4, Stage.INTERNAL_RESPONSE); ++ put(Verb.UNUSED_5, Stage.INTERNAL_RESPONSE); put(Verb.PING, Stage.READ); }}; @@@ -929,6 -827,6 +939,14 @@@ } /** ++ * SHOULD ONLY BE USED FOR TESTING!! ++ */ ++ public void removeVerbHandler(Verb verb) ++ { ++ verbHandlers.remove(verb); ++ } ++ ++ /** * This method returns the verb handler associated with the registered * verb. If no handler has been registered then null is returned. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java index f22ab48,0000000..db04ca3 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java +++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java @@@ -1,171 -1,0 +1,216 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ - +package org.apache.cassandra.net; + ++import java.util.HashSet; ++import java.util.Map; +import java.util.Set; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; - import java.util.function.Predicate; - import java.util.stream.Collectors; + ++import com.google.common.annotations.VisibleForTesting; ++import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + ++import org.apache.cassandra.gms.ApplicationState; ++import org.apache.cassandra.gms.EndpointState; ++import org.apache.cassandra.gms.Gossiper; ++import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; ++import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.InetAddressAndPort; - import org.apache.cassandra.net.async.OutboundConnectionIdentifier; ++import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.net.MessagingService.Verb.PING; ++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; ++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + +public class StartupClusterConnectivityChecker +{ + private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); + - enum State { CONTINUE, FINISH_SUCCESS, FINISH_TIMEOUT } - + private final int targetPercent; - private final int timeoutSecs; - private final Predicate<InetAddressAndPort> gossipStatus; ++ private final long timeoutNanos; + - public StartupClusterConnectivityChecker(int targetPercent, int timeoutSecs, Predicate<InetAddressAndPort> gossipStatus) ++ public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) + { - if (targetPercent < 0) - { - targetPercent = 0; - } - else if (targetPercent > 100) - { - targetPercent = 100; - } - this.targetPercent = targetPercent; - - if (timeoutSecs < 0) - { - timeoutSecs = 1; - } - else if (timeoutSecs > 100) - { ++ timeoutSecs = Math.max(1, timeoutSecs); ++ if (timeoutSecs > 100) + logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); - } - this.timeoutSecs = timeoutSecs; ++ long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs); + - this.gossipStatus = gossipStatus; ++ return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos); + } + - public void execute(Set<InetAddressAndPort> peers) ++ @VisibleForTesting ++ StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos) + { - if (peers == null || targetPercent == 0) - return; ++ this.targetPercent = Math.min(100, Math.max(0, targetPercent)); ++ this.timeoutNanos = timeoutNanos; ++ } + - // remove current node from the set - peers = peers.stream() - .filter(peer -> !peer.equals(FBUtilities.getBroadcastAddressAndPort())) - .collect(Collectors.toSet()); ++ /** ++ * @param peers The currently known peers in the cluster; argument is not modified. ++ * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; ++ * else false. ++ */ ++ public boolean execute(Set<InetAddressAndPort> peers) ++ { ++ if (targetPercent == 0 || peers == null) ++ return true; + - // don't block if there's no other nodes in the cluster (or we don't know about them) - if (peers.size() <= 0) - return; ++ // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) ++ peers = new HashSet<>(peers); ++ peers.remove(FBUtilities.getBroadcastAddressAndPort()); + - logger.info("choosing to block until {}% of peers are marked alive and connections are established; max time to wait = {} seconds", - targetPercent, timeoutSecs); ++ if (peers.isEmpty()) ++ return true; + - // first, send out a ping message to open up the non-gossip connections - final AtomicInteger connectedCount = sendPingMessages(peers); ++ logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds", ++ targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); + - final long startNanos = System.nanoTime(); - final long expirationNanos = startNanos + TimeUnit.SECONDS.toNanos(timeoutSecs); - int completedRounds = 0; - while (checkStatus(peers, connectedCount, startNanos, expirationNanos < System.nanoTime(), completedRounds) == State.CONTINUE) - { - completedRounds++; - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MICROSECONDS); - } - } ++ long startNanos = System.nanoTime(); + - State checkStatus(Set<InetAddressAndPort> peers, AtomicInteger connectedCount, final long startNanos, boolean beyondExpiration, final int completedRounds) - { - long currentAlive = peers.stream().filter(gossipStatus).count(); - float currentAlivePercent = ((float) currentAlive / (float) peers.size()) * 100; ++ AckMap acks = new AckMap(3); ++ int target = (int) ((targetPercent / 100.0) * peers.size()); ++ CountDownLatch latch = new CountDownLatch(target); + - // assume two connections to remote host that we care to track here (small msg & large msg) - final int totalConnectionsSize = peers.size() * 2; - final int connectionsCount = connectedCount.get(); - float currentConnectedPercent = ((float) connectionsCount / (float) totalConnectionsSize) * 100; ++ // set up a listener to react to new nodes becoming alive (in gossip), and account for all the nodes that are already alive ++ Set<InetAddressAndPort> alivePeers = Sets.newSetFromMap(new ConcurrentHashMap<>()); ++ AliveListener listener = new AliveListener(alivePeers, latch, acks); ++ Gossiper.instance.register(listener); + - if (currentAlivePercent >= targetPercent && currentConnectedPercent >= targetPercent) - { - logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " + - "and {}% ({} / {}) of peers as connected, " + - "both of which are above the desired threshold of {}%", - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), - currentAlivePercent, currentAlive, peers.size(), - currentConnectedPercent, connectionsCount, totalConnectionsSize, - targetPercent); - return State.FINISH_SUCCESS; - } ++ // send out a ping message to open up the non-gossip connections ++ sendPingMessages(peers, latch, acks); + - // perform at least two rounds of checking, else this is kinda useless (and the operator set the aliveTimeoutSecs too low) - if (completedRounds >= 2 && beyondExpiration) - { - logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " + - "and {}% ({} / {}) of peers as connected, " + - "one or both of which is below the desired threshold of {}%", - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), - currentAlivePercent, currentAlive, peers.size(), - currentConnectedPercent, connectionsCount, totalConnectionsSize, - targetPercent); - return State.FINISH_TIMEOUT; - } - return State.CONTINUE; ++ for (InetAddressAndPort peer : peers) ++ if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) && acks.incrementAndCheck(peer)) ++ latch.countDown(); ++ ++ boolean succeeded = Uninterruptibles.awaitUninterruptibly(latch, timeoutNanos, TimeUnit.NANOSECONDS); ++ Gossiper.instance.unregister(listener); ++ ++ int connected = peers.size() - (int) latch.getCount(); ++ logger.info("After waiting/processing for {} milliseconds, {} out of {} peers ({}%) have been marked alive and had connections established", ++ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), ++ connected, ++ peers.size(), ++ connected / (peers.size()) * 100.0); ++ return succeeded; + } + + /** - * Sends a "connection warmup" message to each peer in the collection, on every {@link OutboundConnectionIdentifier.ConnectionType} - * used for internode messaging. ++ * Sends a "connection warmup" message to each peer in the collection, on every {@link ConnectionType} ++ * used for internode messaging (that is not gossip). + */ - private AtomicInteger sendPingMessages(Set<InetAddressAndPort> peers) ++ private void sendPingMessages(Set<InetAddressAndPort> peers, CountDownLatch latch, AckMap acks) + { - AtomicInteger connectedCount = new AtomicInteger(0); + IAsyncCallback responseHandler = new IAsyncCallback() + { - @Override + public boolean isLatencyForSnitch() + { + return false; + } + - @Override + public void response(MessageIn msg) + { - connectedCount.incrementAndGet(); ++ if (acks.incrementAndCheck(msg.from)) ++ latch.countDown(); + } + }; + - MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage, PingMessage.serializer); - MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage, PingMessage.serializer); ++ MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage, ++ PingMessage.serializer, SMALL_MESSAGE); ++ MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage, ++ PingMessage.serializer, LARGE_MESSAGE); + for (InetAddressAndPort peer : peers) + { + MessagingService.instance().sendRR(smallChannelMessageOut, peer, responseHandler); + MessagingService.instance().sendRR(largeChannelMessageOut, peer, responseHandler); + } ++ } + - return connectedCount; ++ /** ++ * A trivial implementation of {@link IEndpointStateChangeSubscriber} that really only cares about ++ * {@link #onAlive(InetAddressAndPort, EndpointState)} invocations. ++ */ ++ private static final class AliveListener implements IEndpointStateChangeSubscriber ++ { ++ private final CountDownLatch latch; ++ private final Set<InetAddressAndPort> livePeers; ++ private final AckMap acks; ++ ++ AliveListener(Set<InetAddressAndPort> livePeers, CountDownLatch latch, AckMap acks) ++ { ++ this.latch = latch; ++ this.livePeers = livePeers; ++ this.acks = acks; ++ } ++ ++ public void onJoin(InetAddressAndPort endpoint, EndpointState epState) ++ { ++ } ++ ++ public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) ++ { ++ } ++ ++ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) ++ { ++ } ++ ++ public void onAlive(InetAddressAndPort endpoint, EndpointState state) ++ { ++ if (livePeers.add(endpoint) && acks.incrementAndCheck(endpoint)) ++ latch.countDown(); ++ } ++ ++ public void onDead(InetAddressAndPort endpoint, EndpointState state) ++ { ++ } ++ ++ public void onRemove(InetAddressAndPort endpoint) ++ { ++ } ++ ++ public void onRestart(InetAddressAndPort endpoint, EndpointState state) ++ { ++ } ++ } ++ ++ private static final class AckMap ++ { ++ private final int threshold; ++ private final Map<InetAddressAndPort, AtomicInteger> acks; ++ ++ AckMap(int threshold) ++ { ++ this.threshold = threshold; ++ acks = new ConcurrentHashMap<>(); ++ } ++ ++ boolean incrementAndCheck(InetAddressAndPort address) ++ { ++ return acks.computeIfAbsent(address, addr -> new AtomicInteger(0)).incrementAndGet() == threshold; ++ } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java index 80b8b7b,d9bd5c3..6e0b92b --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@@ -503,12 -513,6 +500,10 @@@ public class CassandraDaemo */ public void start() { - StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(DatabaseDescriptor.getBlockForPeersPercentage(), - DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(), - Gossiper.instance::isAlive); - Set<InetAddressAndPort> peers = Gossiper.instance.getEndpointStates().stream().map(Map.Entry::getKey).collect(Collectors.toSet()); - connectivityChecker.execute(peers); ++ StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersPercentage(), ++ DatabaseDescriptor.getBlockForPeersTimeoutInSeconds()); ++ connectivityChecker.execute(Gossiper.instance.getEndpoints()); + String nativeFlag = System.getProperty("cassandra.start_native_transport"); if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport())) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java index 0000000,0000000..db38efb new file mode 100644 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java @@@ -1,0 -1,0 +1,121 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.net; ++ ++import java.net.UnknownHostException; ++import java.util.Collections; ++ ++import org.junit.AfterClass; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.BeforeClass; ++import org.junit.Test; ++ ++import org.apache.cassandra.config.DatabaseDescriptor; ++import org.apache.cassandra.locator.InetAddressAndPort; ++ ++public class MessageDeliveryTaskTest ++{ ++ private static final MockVerbHandler VERB_HANDLER = new MockVerbHandler(); ++ ++ @BeforeClass ++ public static void before() ++ { ++ DatabaseDescriptor.daemonInitialization(); ++ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.UNUSED_2, VERB_HANDLER); ++ } ++ ++ @AfterClass ++ public static void after() ++ { ++ MessagingService.instance().removeVerbHandler(MessagingService.Verb.UNUSED_2); ++ } ++ ++ @Before ++ public void setUp() ++ { ++ VERB_HANDLER.reset(); ++ } ++ ++ @Test ++ public void process_HappyPath() throws UnknownHostException ++ { ++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1"); ++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_2, 1); ++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42); ++ Assert.assertTrue(task.process()); ++ Assert.assertEquals(1, VERB_HANDLER.invocationCount); ++ } ++ ++ @Test ++ public void process_NullVerb() throws UnknownHostException ++ { ++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1"); ++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), null, 1); ++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42); ++ Assert.assertFalse(task.process()); ++ } ++ ++ @Test ++ public void process_NoHandler() throws UnknownHostException ++ { ++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1"); ++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_5, 1); ++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42); ++ Assert.assertFalse(task.process()); ++ } ++ ++ @Test ++ public void process_ExpiredDroppableMessage() throws UnknownHostException ++ { ++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1"); ++ ++ // we need any droppable verb, so just grab it from the enum itself rather than hard code a value ++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.DROPPABLE_VERBS.iterator().next(), 1, 0); ++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42); ++ Assert.assertFalse(task.process()); ++ } ++ ++ // non-droppable message should still be processed even if they are expired ++ @Test ++ public void process_ExpiredMessage() throws UnknownHostException ++ { ++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1"); ++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_2, 1, 0); ++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42); ++ Assert.assertTrue(task.process()); ++ Assert.assertEquals(1, VERB_HANDLER.invocationCount); ++ } ++ ++ private static class MockVerbHandler implements IVerbHandler<Object> ++ { ++ private int invocationCount; ++ ++ @Override ++ public void doVerb(MessageIn<Object> message, int id) ++ { ++ invocationCount++; ++ } ++ ++ void reset() ++ { ++ invocationCount = 0; ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/MessageInTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/net/MessageInTest.java index 0000000,0000000..b9ea7da new file mode 100644 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MessageInTest.java @@@ -1,0 -1,0 +1,66 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.net; ++ ++import java.io.IOException; ++import java.nio.ByteBuffer; ++import java.util.Collections; ++ ++import org.junit.Assert; ++import org.junit.BeforeClass; ++import org.junit.Test; ++ ++import org.apache.cassandra.config.DatabaseDescriptor; ++import org.apache.cassandra.io.util.DataInputBuffer; ++import org.apache.cassandra.io.util.DataInputPlus; ++import org.apache.cassandra.locator.InetAddressAndPort; ++ ++public class MessageInTest ++{ ++ @BeforeClass ++ public static void before() ++ { ++ DatabaseDescriptor.daemonInitialization(); ++ } ++ ++ // make sure deserializing message doesn't crash with an unknown verb ++ @Test ++ public void read_NullVerb() throws IOException ++ { ++ read(null); ++ } ++ ++ @Test ++ public void read_NoSerializer() throws IOException ++ { ++ read(MessagingService.Verb.UNUSED_5); ++ } ++ ++ private void read(MessagingService.Verb verb) throws IOException ++ { ++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1"); ++ ByteBuffer buf = ByteBuffer.allocate(64); ++ buf.limit(buf.capacity()); ++ DataInputPlus dataInputBuffer = new DataInputBuffer(buf, false); ++ int payloadSize = 27; ++ Assert.assertEquals(0, buf.position()); ++ Assert.assertNotNull(MessageIn.read(dataInputBuffer, 1, 42, 0, addr, payloadSize, verb, Collections.emptyMap())); ++ Assert.assertEquals(payloadSize, buf.position()); ++ } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java index 12f54c6,0000000..4eeb314 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java +++ b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java @@@ -1,129 -1,0 +1,157 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.net.UnknownHostException; ++import java.util.Collections; ++import java.util.HashMap; +import java.util.HashSet; ++import java.util.Map; +import java.util.Set; - import java.util.concurrent.atomic.AtomicInteger; - import java.util.function.Predicate; + - import com.google.common.net.InetAddresses; ++import org.junit.After; +import org.junit.Assert; ++import org.junit.Before; ++import org.junit.BeforeClass; +import org.junit.Test; + ++import org.apache.cassandra.config.DatabaseDescriptor; ++import org.apache.cassandra.gms.EndpointState; ++import org.apache.cassandra.gms.Gossiper; ++import org.apache.cassandra.gms.HeartBeatState; +import org.apache.cassandra.locator.InetAddressAndPort; + ++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; ++ +public class StartupClusterConnectivityCheckerTest +{ - @Test - public void testConnectivity_SimpleHappyPath() throws UnknownHostException ++ private StartupClusterConnectivityChecker connectivityChecker; ++ private Set<InetAddressAndPort> peers; ++ ++ @BeforeClass ++ public static void before() + { - StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true); - int count = 10; - Set<InetAddressAndPort> peers = createNodes(count); - Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_SUCCESS, - connectivityChecker.checkStatus(peers, new AtomicInteger(count * 2), System.nanoTime(), false, 0)); ++ DatabaseDescriptor.daemonInitialization(); + } + - @Test - public void testConnectivity_SimpleContinue() throws UnknownHostException ++ @Before ++ public void setUp() throws UnknownHostException ++ { ++ connectivityChecker = new StartupClusterConnectivityChecker(70, 10); ++ peers = new HashSet<>(); ++ peers.add(InetAddressAndPort.getByName("127.0.1.0")); ++ peers.add(InetAddressAndPort.getByName("127.0.1.1")); ++ peers.add(InetAddressAndPort.getByName("127.0.1.2")); ++ } ++ ++ @After ++ public void tearDown() + { - StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true); - int count = 10; - Set<InetAddressAndPort> peers = createNodes(count); - Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE, - connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 0)); ++ MessagingService.instance().clearMessageSinks(); + } + + @Test - public void testConnectivity_Timeout() throws UnknownHostException ++ public void execute_HappyPath() + { - StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true); - int count = 10; - Set<InetAddressAndPort> peers = createNodes(count); - Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE, - connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 4)); - Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_TIMEOUT, - connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), true, 5)); ++ Sink sink = new Sink(true, true); ++ MessagingService.instance().addMessageSink(sink); ++ Assert.assertTrue(connectivityChecker.execute(peers)); ++ checkAllConnectionTypesSeen(sink); + } + + @Test - public void testConnectivity_SimpleUpdating() throws UnknownHostException ++ public void execute_NotAlive() + { - UpdatablePredicate predicate = new UpdatablePredicate(); - final int count = 100; - final int thresholdPercentage = 70; - StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(thresholdPercentage, 10, predicate); - Set<InetAddressAndPort> peers = createNodes(count); ++ Sink sink = new Sink(false, true); ++ MessagingService.instance().addMessageSink(sink); ++ Assert.assertFalse(connectivityChecker.execute(peers)); ++ checkAllConnectionTypesSeen(sink); ++ } + - AtomicInteger connectedCount = new AtomicInteger(); ++ @Test ++ public void execute_NoConnectionsAcks() ++ { ++ Sink sink = new Sink(true, false); ++ MessagingService.instance().addMessageSink(sink); ++ Assert.assertFalse(connectivityChecker.execute(peers)); ++ } + - for (int i = 0; i < count; i++) ++ private void checkAllConnectionTypesSeen(Sink sink) ++ { ++ for (InetAddressAndPort peer : peers) + { - predicate.reset(i); - connectedCount.set(i * 2); - StartupClusterConnectivityChecker.State expectedState = i < thresholdPercentage ? - StartupClusterConnectivityChecker.State.CONTINUE : - StartupClusterConnectivityChecker.State.FINISH_SUCCESS; - Assert.assertEquals("failed on iteration " + i, - expectedState, connectivityChecker.checkStatus(peers, connectedCount, System.nanoTime(), false, i)); ++ ConnectionTypeRecorder recorder = sink.seenConnectionRequests.get(peer); ++ Assert.assertNotNull(recorder); ++ Assert.assertTrue(recorder.seenSmallMessageRequest); ++ Assert.assertTrue(recorder.seenLargeMessageRequest); + } + } + - /** - * returns true until index = threshold, then returns false. - */ - private class UpdatablePredicate implements Predicate<InetAddressAndPort> ++ private static class Sink implements IMessageSink + { - int index; - int threshold; ++ private final boolean markAliveInGossip; ++ private final boolean processConnectAck; ++ private final Map<InetAddressAndPort, ConnectionTypeRecorder> seenConnectionRequests; + - void reset(int threshold) ++ Sink(boolean markAliveInGossip, boolean processConnectAck) + { - index = 0; - this.threshold = threshold; ++ this.markAliveInGossip = markAliveInGossip; ++ this.processConnectAck = processConnectAck; ++ seenConnectionRequests = new HashMap<>(); + } + + @Override - public boolean test(InetAddressAndPort inetAddressAndPort) ++ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to) + { - index++; - return index <= threshold; ++ ConnectionTypeRecorder recorder = seenConnectionRequests.computeIfAbsent(to, inetAddress -> new ConnectionTypeRecorder()); ++ if (message.connectionType == SMALL_MESSAGE) ++ { ++ Assert.assertFalse(recorder.seenSmallMessageRequest); ++ recorder.seenSmallMessageRequest = true; ++ } ++ else ++ { ++ Assert.assertFalse(recorder.seenLargeMessageRequest); ++ recorder.seenLargeMessageRequest = true; ++ } ++ ++ if (processConnectAck) ++ { ++ MessageIn msgIn = MessageIn.create(to, message.payload, Collections.emptyMap(), MessagingService.Verb.REQUEST_RESPONSE, 1); ++ MessagingService.instance().getRegisteredCallback(id).callback.response(msgIn); ++ } ++ ++ if (markAliveInGossip) ++ Gossiper.instance.realMarkAlive(to, new EndpointState(new HeartBeatState(1, 1))); ++ return false; + } - } - - private static Set<InetAddressAndPort> createNodes(int count) throws UnknownHostException - { - Set<InetAddressAndPort> nodes = new HashSet<>(); + - if (count < 1) - Assert.fail("need at least *one* node in the set!"); - - InetAddressAndPort node = InetAddressAndPort.getByName("127.0.0.1"); - nodes.add(node); - for (int i = 1; i < count; i++) ++ @Override ++ public boolean allowIncomingMessage(MessageIn message, int id) + { - node = InetAddressAndPort.getByAddress(InetAddresses.increment(node.address)); - nodes.add(node); ++ return false; + } - return nodes; + } + ++ private static class ConnectionTypeRecorder ++ { ++ boolean seenSmallMessageRequest; ++ boolean seenLargeMessageRequest; ++ } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org