Repository: cassandra Updated Branches: refs/heads/trunk 0e5266664 -> 2bc5f0c61
Always check for collisions before joining ring Patch by Sam Tunnicliffe; reviewed by Joel Knighton for CASSANDRA-10134 The collision check and shadow round can be skipped completely (for testing etc) by setting cassandra.allow_unsafe_join=true. This commit also enables explicit unsafe replace without bootstrap by using both auto_bootstrap=false and cassandra.replace_address. Doing so requires cassandra.allow_unsafe_replace=true. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2bc5f0c6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2bc5f0c6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2bc5f0c6 Branch: refs/heads/trunk Commit: 2bc5f0c61ddb428b4826d83d42dad473eaeac002 Parents: 0e52666 Author: Sam Tunnicliffe <[email protected]> Authored: Wed Mar 16 09:53:04 2016 +0000 Committer: Sam Tunnicliffe <[email protected]> Committed: Wed Apr 27 09:25:33 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 4 + .../apache/cassandra/db/view/ViewBuilder.java | 1 + .../apache/cassandra/db/view/ViewManager.java | 16 ++ .../gms/GossipDigestAckVerbHandler.java | 5 +- .../gms/GossipDigestSynVerbHandler.java | 28 ++- src/java/org/apache/cassandra/gms/Gossiper.java | 92 ++++++++-- .../org/apache/cassandra/io/util/FileUtils.java | 4 +- .../locator/DynamicEndpointSnitch.java | 2 +- .../cassandra/service/StorageService.java | 172 ++++++++++++------- .../cassandra/service/StorageServiceMBean.java | 2 +- .../apache/cassandra/tools/nodetool/Info.java | 2 +- .../cassandra/utils/JVMStabilityInspector.java | 4 +- .../unit/org/apache/cassandra/SchemaLoader.java | 2 + 14 files changed, 244 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2bbd39d..f37a8ab 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Always perform collision check before joining ring (CASSANDRA-10134) * SSTableWriter output discrepancy (CASSANDRA-11646) * Fix potential timeout in NativeTransportService.testConcurrentDestroys (CASSANDRA-10756) * Support large partitions on the 3.0 sstable format (CASSANDRA-11206) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index a177d37..7f24d2c 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,10 @@ using the provided 'sstableupgrade' tool. New features ------------ + - Collision checks are performed when joining the token ring, regardless of whether + the node should bootstrap. Additionally, replace_address can legitimately be used + without bootstrapping to help with recovery of nodes with partially failed disks. + See CASSANDRA-10134 for more details. - Key cache will only hold indexed entries up to the size configured by column_index_cache_size_in_kb in cassandra.yaml in memory. Larger indexed entries will never go into memory. See CASSANDRA-11206 for more details. http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index 23eeba4..8944122 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -103,6 +103,7 @@ public class ViewBuilder extends CompactionInfo.Holder public void run() { + logger.trace("Running view builder for {}.{}", baseCfs.metadata.ksName, view.name); UUID localHostId = SystemKeyspace.getLocalHostId(); String ksname = baseCfs.metadata.ksName, viewName = view.name; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java index faa5551..37428ad 100644 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@ -35,6 +35,8 @@ import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.dht.Token; import org.apache.cassandra.repair.SystemDistributedKeyspace; import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,6 +214,20 @@ public class ViewManager addView(entry.getValue()); } + // Building views involves updating view build status in the system_distributed + // keyspace and therefore it requires ring information. This check prevents builds + // being submitted when Keyspaces are initialized during CassandraDaemon::setup as + // that happens before StorageService & gossip are initialized. After SS has been + // init'd we schedule builds for *all* views anyway, so this doesn't have any effect + // on startup. It does mean however, that builds will not be triggered if gossip is + // disabled via JMX or nodetool as that sets SS to an uninitialized state. + if (!StorageService.instance.isInitialized()) + { + logger.info("Not submitting build tasks for views in keyspace {} as " + + "storage service is not initialized", keyspace.getName()); + return; + } + for (View view : allViews()) { view.build(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java index 9f69a94..15662b1 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java @@ -61,8 +61,9 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck> if (Gossiper.instance.isInShadowRound()) { if (logger.isDebugEnabled()) - logger.debug("Finishing shadow round with {}", from); - Gossiper.instance.finishShadowRound(); + logger.debug("Received an ack from {}, which may trigger exit from shadow round", from); + // if the ack is completely empty, then we can infer that the respondent is also in a shadow round + Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty()); return; // don't bother doing anything else, we have what we came for } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java index 1c67570..6d0afa2 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java @@ -38,7 +38,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn> InetAddress from = message.from; if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestSynMessage from {}", from); - if (!Gossiper.instance.isEnabled()) + if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled"); @@ -60,6 +60,32 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn> } List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests(); + + // if the syn comes from a peer performing a shadow round and this node is + // also currently in a shadow round, send back a minimal ack. This node must + // be in the sender's seed list and doing this allows the sender to + // differentiate between seeds from which it is partitioned and those which + // are in their shadow round + if (!Gossiper.instance.isEnabled() && Gossiper.instance.isInShadowRound()) + { + // a genuine syn (as opposed to one from a node currently + // doing a shadow round) will always contain > 0 digests + if (gDigestList.size() > 0) + { + logger.debug("Ignoring non-empty GossipDigestSynMessage because currently in gossip shadow round"); + return; + } + + logger.debug("Received a shadow round syn from {}. Gossip is disabled but " + + "currently also in shadow round, responding with a minimal ack", from); + MessagingService.instance() + .sendOneWay(new MessageOut<>(MessagingService.Verb.GOSSIP_DIGEST_ACK, + new GossipDigestAck(new ArrayList<>(), new HashMap<>()), + GossipDigestAck.serializer), + from); + return; + } + if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 6f63727..76e7577 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -123,6 +123,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>(); private volatile boolean inShadowRound = false; + private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator); private volatile long lastProcessedMessageAt = System.currentTimeMillis(); @@ -708,27 +709,46 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } /** - * Check if this endpoint can safely bootstrap into the cluster. + * Check if this node can safely be started and join the ring. + * If the node is bootstrapping, examines gossip state for any previous status to decide whether + * it's safe to allow this node to start & bootstrap. If not bootstrapping, compares the host ID + * that the node itself has (obtained by reading from system.local or generated if not present) + * with the host ID obtained from gossip for the endpoint address (if any). This latter case + * prevents a non-bootstrapping, new node from being started with the same address of a + * previously started, but currently down predecessor. * * @param endpoint - the endpoint to check - * @return true if the endpoint can join the cluster + * @param localHostUUID - the host id to check + * @param isBootstrapping - whether the node intends to bootstrap when joining + * @return true if it is safe to start the node, false otherwise */ - public boolean isSafeForBootstrap(InetAddress endpoint) + public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping) { EndpointState epState = endpointStateMap.get(endpoint); - // if there's no previous state, or the node was previously removed from the cluster, we're good if (epState == null || isDeadState(epState)) return true; - String status = getGossipStatus(epState); - - // these states are not allowed to join the cluster as it would not be safe - final List<String> unsafeStatuses = new ArrayList<String>() {{ - add(""); // failed bootstrap but we did start gossiping - add(VersionedValue.STATUS_NORMAL); // node is legit in the cluster or it was stopped with kill -9 - add(VersionedValue.SHUTDOWN); }}; // node was shutdown - return !unsafeStatuses.contains(status); + if (isBootstrapping) + { + String status = getGossipStatus(epState); + // these states are not allowed to join the cluster as it would not be safe + final List<String> unsafeStatuses = new ArrayList<String>() + {{ + add(""); // failed bootstrap but we did start gossiping + add(VersionedValue.STATUS_NORMAL); // node is legit in the cluster or it was stopped with kill -9 + add(VersionedValue.SHUTDOWN); // node was shutdown + }}; + return !unsafeStatuses.contains(status); + } + else + { + // if the previous UUID matches what we currently have (i.e. what was read from + // system.local at startup), then we're good to start up. Otherwise, something + // is amiss and we need to replace the previous node + VersionedValue previous = epState.getApplicationState(ApplicationState.HOST_ID); + return UUID.fromString(previous.value).equals(localHostUUID); + } } private void doStatusCheck() @@ -1318,11 +1338,19 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /** * Do a single 'shadow' round of gossip, where we do not modify any state - * Only used when replacing a node, to get and assume its states + * Used when preparing to join the ring: + * * when replacing a node, to get and assume its tokens + * * when joining, to check that the local host id matches any previous id for the endpoint address */ public void doShadowRound() { buildSeedsList(); + // it may be that the local address is the only entry in the seed + // list in which case, attempting a shadow round is pointless + if (seeds.isEmpty()) + return; + + seedsInShadowRound.clear(); // send a completely empty syn List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), @@ -1341,6 +1369,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (slept % 5000 == 0) { // CASSANDRA-8072, retry at the beginning and every 5 seconds logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds); + for (InetAddress seed : seeds) MessagingService.instance().sendOneWay(message, seed); } @@ -1351,7 +1380,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean slept += 1000; if (slept > StorageService.RING_DELAY) - throw new RuntimeException("Unable to gossip with any seeds"); + { + // if we don't consider ourself to be a seed, fail out + if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())) + throw new RuntimeException("Unable to gossip with any seeds"); + + logger.warn("Unable to gossip with any seeds but continuing since node is in its own seed list"); + inShadowRound = false; + break; + } } } catch (InterruptedException wtf) @@ -1478,10 +1515,33 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } - protected void finishShadowRound() + protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound) { if (inShadowRound) - inShadowRound = false; + { + if (!isInShadowRound) + { + logger.debug("Received a regular ack from {}, can now exit shadow round", respondent); + // respondent sent back a full ack, so we can exit our shadow round + inShadowRound = false; + seedsInShadowRound.clear(); + } + else + { + // respondent indicates it too is in a shadow round, if all seeds + // are in this state then we can exit our shadow round. Otherwise, + // we keep retrying the SR until one responds with a full ACK or + // we learn that all seeds are in SR. + logger.debug("Received an ack from {} indicating it is also in shadow round", respondent); + seedsInShadowRound.add(respondent); + if (seedsInShadowRound.containsAll(seeds)) + { + logger.debug("All seeds are in a shadow round, clearing this node to exit its own"); + inShadowRound = false; + seedsInShadowRound.clear(); + } + } + } } protected boolean isInShadowRound() http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index a076bbd..6b58e85 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -463,7 +463,7 @@ public class FileUtils public static void handleCorruptSSTable(CorruptSSTableException e) { - if (!StorageService.instance.isSetupCompleted()) + if (!StorageService.instance.isDaemonSetupCompleted()) handleStartupFSError(e); JVMStabilityInspector.inspectThrowable(e); @@ -477,7 +477,7 @@ public class FileUtils public static void handleFSError(FSError e) { - if (!StorageService.instance.isSetupCompleted()) + if (!StorageService.instance.isDaemonSetupCompleted()) handleStartupFSError(e); JVMStabilityInspector.inspectThrowable(e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 3e89dd4..fb0db13 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -253,7 +253,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private void updateScores() // this is expensive { - if (!StorageService.instance.isInitialized()) + if (!StorageService.instance.isGossipActive()) return; if (!registered) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 56d9b4a..d4ad59a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -251,8 +251,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /* true if node is rebuilding and receiving data */ private final AtomicBoolean isRebuilding = new AtomicBoolean(); - private boolean initialized; + private volatile boolean initialized = false; private volatile boolean joined = false; + private volatile boolean gossipActive = false; /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */ private double traceProbability = 0.0; @@ -369,24 +370,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // should only be called via JMX public void stopGossiping() { - if (initialized) + if (gossipActive) { logger.warn("Stopping gossip by operator request"); Gossiper.instance.stop(); - initialized = false; + gossipActive = false; } } // should only be called via JMX public void startGossiping() { - if (!initialized) + if (!gossipActive) { logger.warn("Starting gossip by operator request"); setGossipTokens(getLocalTokens()); Gossiper.instance.forceNewerGeneration(); Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); - initialized = true; + gossipActive = true; } } @@ -462,7 +463,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void stopTransports() { - if (isInitialized()) + if (isGossipActive()) { logger.error("Stopping gossiper"); stopGossiping(); @@ -500,7 +501,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return initialized; } - public boolean isSetupCompleted() + public boolean isGossipActive() + { + return gossipActive; + } + + public boolean isDaemonSetupCompleted() { return daemon == null ? false @@ -514,50 +520,56 @@ public class StorageService extends NotificationBroadcasterSupport implements IE daemon.deactivate(); } - public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException + private synchronized UUID prepareReplacementInfo(InetAddress replaceAddress) throws ConfigurationException { logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress()); - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(); - - // make magic happen Gossiper.instance.doShadowRound(); + // as we've completed the shadow round of gossip, we should be able to find the node we're replacing + if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == null) + throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress)); - UUID hostId = null; - // now that we've gossiped at least once, we should be able to find the node we're replacing - if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null) - throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip"); - hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); try { - VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS); + VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS); if (tokensVersionedValue == null) - throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); - Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); + throw new RuntimeException(String.format("Could not find tokens for %s to replace", replaceAddress)); - SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc - Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need - return tokens; + bootstrapTokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); } catch (IOException e) { throw new RuntimeException(e); } + + // we'll use the replacee's host Id as our own so we receive hints, etc + UUID localHostId = Gossiper.instance.getHostId(replaceAddress); + SystemKeyspace.setLocalHostId(localHostId); + Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need + return localHostId; } - public synchronized void checkForEndpointCollision() throws ConfigurationException + private synchronized void checkForEndpointCollision(UUID localHostId) throws ConfigurationException { + if (Boolean.getBoolean("cassandra.allow_unsafe_join")) + { + logger.warn("Skipping endpoint collision check as cassandra.allow_unsafe_join=true"); + return; + } + logger.debug("Starting shadow gossip round to check for endpoint collision"); - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(); Gossiper.instance.doShadowRound(); - if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress())) + // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so. + // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local + // one, which was either read from system.local or generated at startup. If a learned id is present & + // doesn't match the local, then the node needs replacing + if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap())) { throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " + "Use cassandra.replace_address if you want to replace this node.", FBUtilities.getBroadcastAddress())); } - if (useStrictConsistency && !allowSimultaneousMoves()) + + if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves()) { for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates()) { @@ -571,6 +583,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true"); } } + logger.debug("Resetting gossip state after shadow round"); Gossiper.instance.resetEndpointStateMap(); } @@ -583,6 +596,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void unsafeInitialize() throws ConfigurationException { initialized = true; + gossipActive = true; Gossiper.instance.register(this); Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); // needed for node-ring gathering. Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion()); @@ -617,8 +631,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info("CQL supported versions: {} (default: {})", StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION); - initialized = true; - try { // Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797. @@ -631,27 +643,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new AssertionError(e); } - if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) - { - logger.info("Loading persisted ring state"); - Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens(); - Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds(); - for (InetAddress ep : loadedTokens.keySet()) - { - if (ep.equals(FBUtilities.getBroadcastAddress())) - { - // entry has been mistakenly added, delete it - SystemKeyspace.removeEndpoint(ep); - } - else - { - if (loadedHostIds.containsKey(ep)) - tokenMetadata.updateHostId(loadedHostIds.get(ep), ep); - Gossiper.instance.addSavedEndpoint(ep); - } - } - } - // daemon threads, like our executors', continue to run while shutdown hooks are invoked drainOnShutdown = new Thread(new WrappedRunnable() { @@ -724,6 +715,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (!Boolean.parseBoolean(System.getProperty("cassandra.start_gossip", "true"))) { logger.info("Not starting gossip as requested."); + // load ring state in preparation for starting gossip later + loadRingState(); + initialized = true; return; } @@ -758,6 +752,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining"); } + + initialized = true; + } + + private void loadRingState() + { + if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) + { + logger.info("Loading persisted ring state"); + Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens(); + Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds(); + for (InetAddress ep : loadedTokens.keySet()) + { + if (ep.equals(FBUtilities.getBroadcastAddress())) + { + // entry has been mistakenly added, delete it + SystemKeyspace.removeEndpoint(ep); + } + else + { + if (loadedHostIds.containsKey(ep)) + tokenMetadata.updateHostId(loadedHostIds.get(ep), ep); + Gossiper.instance.addSavedEndpoint(ep); + } + } + } } /** @@ -793,47 +813,71 @@ public class StorageService extends NotificationBroadcasterSupport implements IE else throw new ConfigurationException("This node was decommissioned and will not rejoin the ring unless cassandra.override_decommission=true has been set, or all existing data is removed and the node is bootstrapped again"); } - if (replacing && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))) - throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node"); + if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null) throw new RuntimeException("Replace method removed; use cassandra.replace_address instead"); + + if (!MessagingService.instance().isListening()) + MessagingService.instance().listen(); + + UUID localHostId = SystemKeyspace.getLocalHostId(); + if (replacing) { if (SystemKeyspace.bootstrapComplete()) throw new RuntimeException("Cannot replace address with a node that is already bootstrapped"); - if (!DatabaseDescriptor.isAutoBootstrap()) - throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration"); - bootstrapTokens = prepareReplacementInfo(); + + if (!(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))) + throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node"); + + if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace")) + throw new RuntimeException("Replacing a node without bootstrapping risks invalidating consistency " + + "guarantees as the expected data may not be present until repair is run. " + + "To perform this operation, please restart with " + + "-Dcassandra.allow_unsafe_replace=true"); + + InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress(); + localHostId = prepareReplacementInfo(replaceAddress); appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens)); - appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); + + // if want to bootstrap the ranges of the node we're replacing, + // go into hibernate mode while that happens. Otherwise, persist + // the tokens we're taking over locally so that they don't get + // clobbered with auto generated ones in joinTokenRing + if (DatabaseDescriptor.isAutoBootstrap()) + appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); + else + SystemKeyspace.updateTokens(bootstrapTokens); } - else if (shouldBootstrap()) + else { - checkForEndpointCollision(); + checkForEndpointCollision(localHostId); } // have to start the gossip service before we can see any info on other nodes. this is necessary // for bootstrap to get the load info it needs. // (we won't be part of the storage ring though until we add a counterId to our state, below.) // Seed the host ID-to-endpoint map with our own ID. - UUID localHostId = SystemKeyspace.getLocalHostId(); getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress()); appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion()); appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId)); appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getBroadcastRpcAddress())); appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); + + // load the persisted ring state. This used to be done earlier in the init process, + // but now we always perform a shadow round when preparing to join and we have to + // clear endpoint states after doing that. + loadRingState(); + logger.info("Starting up server gossip"); Gossiper.instance.register(this); Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering. + gossipActive = true; // gossip snitch infos (local DC and rack) gossipSnitchInfo(); // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates - - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(); LoadBroadcaster.instance.startBroadcasting(); - HintsService.instance.startDispatch(); BatchlogManager.instance.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 8978472..277fbe9 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -478,7 +478,7 @@ public interface StorageServiceMBean extends NotificationEmitter // allows a user to forcibly completely stop cassandra public void stopDaemon(); - // to determine if gossip is disabled + // to determine if initialization has completed public boolean isInitialized(); // allows a user to disable thrift http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/tools/nodetool/Info.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java b/src/java/org/apache/cassandra/tools/nodetool/Info.java index 0d9bd73..268d9df 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Info.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java @@ -43,7 +43,7 @@ public class Info extends NodeToolCmd @Override public void execute(NodeProbe probe) { - boolean gossipInitialized = probe.isInitialized(); + boolean gossipInitialized = probe.isGossipRunning(); System.out.printf("%-23s: %s%n", "ID", probe.getLocalHostId()); System.out.printf("%-23s: %s%n", "Gossip active", gossipInitialized); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index ab3471c..bda7997 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -31,9 +31,7 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.Cassandra; /** * Responsible for deciding whether to kill the JVM if it gets in an "unstable" state (think OOM). @@ -76,7 +74,7 @@ public final class JVMStabilityInspector public static void inspectCommitLogThrowable(Throwable t) { - if (!StorageService.instance.isSetupCompleted()) + if (!StorageService.instance.isDaemonSetupCompleted()) { logger.error("Exiting due to error while processing commit log during initialization.", t); killer.killCurrentJVM(t, true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 992c4d6..6aea343 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -71,6 +71,8 @@ public class SchemaLoader public static void startGossiper() { + // skip shadow round and endpoint collision check in tests + System.setProperty("cassandra.allow_unsafe_join", "true"); if (!Gossiper.instance.isEnabled()) Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); }
