This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0e43dae383c469a20a6b4a86331935c9f205e8e3 Author: Marcus Eriksson <[email protected]> AuthorDate: Wed Mar 22 09:06:05 2023 +0100 [CEP-21] Always populate local gossip state at startup patch by Marcus Eriksson; reviewed by Alex Petrov and Sam Tunnicliffe for CASSANDRA-18403 --- .../org/apache/cassandra/gms/FailureDetector.java | 7 +---- src/java/org/apache/cassandra/gms/Gossiper.java | 31 +++++++++++----------- .../apache/cassandra/service/StorageService.java | 5 ++-- .../cassandra/tcm/compatibility/GossipHelper.java | 15 +++++------ .../tcm/listeners/LegacyStateListener.java | 15 ++++++++--- .../org/apache/cassandra/tcm/log/LocalLog.java | 2 +- 6 files changed, 39 insertions(+), 36 deletions(-) diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index 51d73cc067..03612c9bd8 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -250,14 +250,9 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean NodeId nodeId = metadata.directory.peerId(FBUtilities.getBroadcastAddressAndPort()); List<Token> tokens = metadata.tokenMap.tokens(nodeId); if (tokens != null) - { - // todo, used to only append tokens.version - sb.append(" TOKENS:").append(metadata.epoch.toString()).append("\n"); - } + sb.append(" TOKENS:").append(metadata.epoch.getEpoch()).append(":<hidden>\n"); else - { sb.append(" TOKENS: not present\n"); - } } /** diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index afd7c751c9..0d470d7b68 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -55,32 +55,34 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; + +import org.apache.cassandra.concurrent.FutureTask; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.tcm.compatibility.GossipHelper; +import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.ExecutorUtils; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.FutureTask; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.NoPayload; import org.apache.cassandra.net.RequestCallback; -import org.apache.cassandra.net.Verb; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; -import org.apache.cassandra.utils.CassandraVersion; -import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.MBeanWrapper; -import org.apache.cassandra.utils.NoSpamLogger; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.RecomputingSupplier; import org.apache.cassandra.utils.concurrent.NotScheduledFuture; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; @@ -1684,25 +1686,24 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void start(int generationNumber) { - start(generationNumber, new EnumMap<>(ApplicationState.class)); + start(generationNumber, false); } /** * Start the gossiper with the generation number, preloading the map of application states before starting */ - public void start(int generationNbr, Map<ApplicationState, VersionedValue> preloadLocalStates) + public void start(int generationNbr, boolean mergeLocalStates) { buildSeedsList(); /* initialize the heartbeat state for this localEndpoint */ maybeInitializeLocalState(generationNbr); - EndpointState localState = endpointStateMap.get(getBroadcastAddressAndPort()); - localState.addApplicationStates(preloadLocalStates); + ClusterMetadata metadata = ClusterMetadata.current(); + if (mergeLocalStates && metadata.myNodeId() != null) + GossipHelper.mergeNodeToGossip(metadata.myNodeId(), metadata); minVersionSupplier.recompute(); //notify snitches that Gossiper is about to start DatabaseDescriptor.getEndpointSnitch().gossiperStarting(); - if (logger.isTraceEnabled()) - logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration()); scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(), Gossiper.intervalInMillis, diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 791a8b564b..b1e4e3093d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -680,7 +680,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE valueFactory.sstableVersions(versions)); }); - Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration()); + Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), + ClusterMetadataService.state() != ClusterMetadataService.State.GOSSIP); // only populate local state if not running in gossip mode Gossiper.instance.register(this); Gossiper.instance.addLocalApplicationState(ApplicationState.SSTABLE_VERSIONS, valueFactory.sstableVersions(sstablesTracker.versionsInUse())); @@ -2106,7 +2107,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void updateTopology() { - throw new IllegalStateException(); + logger.error("Caller should be updated, updateTopology is no longer supported", new RuntimeException()); } private void notifyRpcChange(InetAddressAndPort endpoint, boolean ready) diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 1a6fc17c84..9aa629fd7f 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -33,6 +33,9 @@ import java.util.UUID; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.IPartitioner; @@ -48,7 +51,6 @@ import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.SchemaKeyspace; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Period; @@ -82,12 +84,7 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; public class GossipHelper { - public static void updateSchemaVersionInGossip(UUID version) - { - Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, - StorageService.instance.valueFactory.schema(version)); - } - + private static final Logger logger = LoggerFactory.getLogger(GossipHelper.class); public static void removeFromGossip(InetAddressAndPort addr) { Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.removeEndpoint(addr)); @@ -175,7 +172,9 @@ public class GossipHelper } } HeartBeatState heartBeatState = new HeartBeatState(epstate.getHeartBeatState().getGeneration(), isLocal ? VersionGenerator.getNextVersion() : 0); - Gossiper.instance.unsafeUpdateEpStates(endpoint, new EndpointState(heartBeatState, newStates)); + EndpointState newepstate = new EndpointState(heartBeatState, newStates); + Gossiper.instance.unsafeUpdateEpStates(endpoint, newepstate); + logger.debug("Updated epstates for {}: {}", endpoint, newepstate); }); } diff --git a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java index ccad52f14a..3f03664a8d 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java @@ -19,6 +19,7 @@ package org.apache.cassandra.tcm.listeners; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import java.util.stream.StreamSupport; @@ -31,8 +32,8 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.compatibility.GossipHelper; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.tcm.membership.NodeState; import static org.apache.cassandra.tcm.membership.NodeState.LEFT; @@ -48,9 +49,7 @@ public class LegacyStateListener implements ChangeListener Set<NodeId> changed = new HashSet<>(); for (NodeId node : next.directory.peerIds()) { - NodeState oldState = prev.directory.peerState(node); - NodeState newState = next.directory.peerState(node); - if (oldState == null || oldState != newState || !prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node))) + if (directoryEntryChangedFor(node, prev.directory, next.directory) || !prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node))) changed.add(node); } @@ -71,6 +70,7 @@ public class LegacyStateListener implements ChangeListener Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); break; case JOINED: + SystemKeyspace.updateTokens(next.tokenMap.tokens()); // needed if we miss the REGISTERED above; Does nothing if we are already in epStateMap: Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); @@ -88,4 +88,11 @@ public class LegacyStateListener implements ChangeListener } } } + + private boolean directoryEntryChangedFor(NodeId nodeId, Directory prev, Directory next) + { + return prev.peerState(nodeId) != next.peerState(nodeId) || + !Objects.equals(prev.getNodeAddresses(nodeId), next.getNodeAddresses(nodeId)) || + !Objects.equals(prev.version(nodeId), next.version(nodeId)); + } } diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 0478500e97..25942368c8 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -284,7 +284,7 @@ public abstract class LocalLog implements Closeable if (committed.compareAndSet(prev, next)) { - logger.debug("Enacted {}. New tail is {}", pendingEntry.transform, next.epoch); + logger.info("Enacted {}. New tail is {}", pendingEntry.transform, next.epoch); maybeNotifyListeners(pendingEntry, transformed); } else --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
