This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0e43dae383c469a20a6b4a86331935c9f205e8e3
Author: Marcus Eriksson <[email protected]>
AuthorDate: Wed Mar 22 09:06:05 2023 +0100

    [CEP-21] Always populate local gossip state at startup
    
    patch by Marcus Eriksson; reviewed by Alex Petrov and Sam Tunnicliffe
    for CASSANDRA-18403
---
 .../org/apache/cassandra/gms/FailureDetector.java  |  7 +----
 src/java/org/apache/cassandra/gms/Gossiper.java    | 31 +++++++++++-----------
 .../apache/cassandra/service/StorageService.java   |  5 ++--
 .../cassandra/tcm/compatibility/GossipHelper.java  | 15 +++++------
 .../tcm/listeners/LegacyStateListener.java         | 15 ++++++++---
 .../org/apache/cassandra/tcm/log/LocalLog.java     |  2 +-
 6 files changed, 39 insertions(+), 36 deletions(-)

diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 51d73cc067..03612c9bd8 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -250,14 +250,9 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         NodeId nodeId = 
metadata.directory.peerId(FBUtilities.getBroadcastAddressAndPort());
         List<Token> tokens = metadata.tokenMap.tokens(nodeId);
         if (tokens != null)
-        {
-            // todo, used to only append tokens.version
-            sb.append("  
TOKENS:").append(metadata.epoch.toString()).append("\n");
-        }
+            sb.append("  
TOKENS:").append(metadata.epoch.getEpoch()).append(":<hidden>\n");
         else
-        {
             sb.append("  TOKENS: not present\n");
-        }
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index afd7c751c9..0d470d7b68 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -55,32 +55,34 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.apache.cassandra.concurrent.FutureTask;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.tcm.compatibility.GossipHelper;
+import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.FutureTask;
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.NoPayload;
 import org.apache.cassandra.net.RequestCallback;
-import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
-import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.MBeanWrapper;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.RecomputingSupplier;
 import org.apache.cassandra.utils.concurrent.NotScheduledFuture;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
@@ -1684,25 +1686,24 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     public void start(int generationNumber)
     {
-        start(generationNumber, new EnumMap<>(ApplicationState.class));
+        start(generationNumber, false);
     }
 
     /**
      * Start the gossiper with the generation number, preloading the map of 
application states before starting
      */
-    public void start(int generationNbr, Map<ApplicationState, VersionedValue> 
preloadLocalStates)
+    public void start(int generationNbr, boolean mergeLocalStates)
     {
         buildSeedsList();
         /* initialize the heartbeat state for this localEndpoint */
         maybeInitializeLocalState(generationNbr);
-        EndpointState localState = 
endpointStateMap.get(getBroadcastAddressAndPort());
-        localState.addApplicationStates(preloadLocalStates);
+        ClusterMetadata metadata = ClusterMetadata.current();
+        if (mergeLocalStates && metadata.myNodeId() != null)
+            GossipHelper.mergeNodeToGossip(metadata.myNodeId(), metadata);
         minVersionSupplier.recompute();
 
         //notify snitches that Gossiper is about to start
         DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
-        if (logger.isTraceEnabled())
-            logger.trace("gossip started with generation {}", 
localState.getHeartBeatState().getGeneration());
 
         scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(),
                                                               
Gossiper.intervalInMillis,
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 791a8b564b..b1e4e3093d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -680,7 +680,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                                                        
valueFactory.sstableVersions(versions));
         });
 
-        Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration());
+        Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(),
+                                ClusterMetadataService.state() != 
ClusterMetadataService.State.GOSSIP); // only populate local state if not 
running in gossip mode
         Gossiper.instance.register(this);
         
Gossiper.instance.addLocalApplicationState(ApplicationState.SSTABLE_VERSIONS,
                                                    
valueFactory.sstableVersions(sstablesTracker.versionsInUse()));
@@ -2106,7 +2107,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public void updateTopology()
     {
-        throw new IllegalStateException();
+        logger.error("Caller should be updated, updateTopology is no longer 
supported", new RuntimeException());
     }
 
     private void notifyRpcChange(InetAddressAndPort endpoint, boolean ready)
diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java 
b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
index 1a6fc17c84..9aa629fd7f 100644
--- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
@@ -33,6 +33,9 @@ import java.util.UUID;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
@@ -48,7 +51,6 @@ import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Period;
@@ -82,12 +84,7 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
 public class GossipHelper
 {
-    public static void updateSchemaVersionInGossip(UUID version)
-    {
-        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA,
-                                                   
StorageService.instance.valueFactory.schema(version));
-    }
-
+    private static final Logger logger = 
LoggerFactory.getLogger(GossipHelper.class);
     public static void removeFromGossip(InetAddressAndPort addr)
     {
         Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.removeEndpoint(addr));
@@ -175,7 +172,9 @@ public class GossipHelper
                 }
             }
             HeartBeatState heartBeatState = new 
HeartBeatState(epstate.getHeartBeatState().getGeneration(), isLocal ? 
VersionGenerator.getNextVersion() : 0);
-            Gossiper.instance.unsafeUpdateEpStates(endpoint, new 
EndpointState(heartBeatState, newStates));
+            EndpointState newepstate = new EndpointState(heartBeatState, 
newStates);
+            Gossiper.instance.unsafeUpdateEpStates(endpoint, newepstate);
+            logger.debug("Updated epstates for {}: {}", endpoint, newepstate);
         });
     }
 
diff --git 
a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
index ccad52f14a..3f03664a8d 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.tcm.listeners;
 
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.StreamSupport;
 
@@ -31,8 +32,8 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.compatibility.GossipHelper;
+import org.apache.cassandra.tcm.membership.Directory;
 import org.apache.cassandra.tcm.membership.NodeId;
-import org.apache.cassandra.tcm.membership.NodeState;
 
 import static org.apache.cassandra.tcm.membership.NodeState.LEFT;
 
@@ -48,9 +49,7 @@ public class LegacyStateListener implements ChangeListener
             Set<NodeId> changed = new HashSet<>();
             for (NodeId node : next.directory.peerIds())
             {
-                NodeState oldState = prev.directory.peerState(node);
-                NodeState newState = next.directory.peerState(node);
-                if (oldState == null || oldState != newState || 
!prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node)))
+                if (directoryEntryChangedFor(node, prev.directory, 
next.directory) || 
!prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node)))
                     changed.add(node);
             }
 
@@ -71,6 +70,7 @@ public class LegacyStateListener implements ChangeListener
                             
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
                             break;
                         case JOINED:
+                            
SystemKeyspace.updateTokens(next.tokenMap.tokens());
                             // needed if we miss the REGISTERED above; Does 
nothing if we are already in epStateMap:
                             
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
                             
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
@@ -88,4 +88,11 @@ public class LegacyStateListener implements ChangeListener
             }
         }
     }
+
+    private boolean directoryEntryChangedFor(NodeId nodeId, Directory prev, 
Directory next)
+    {
+        return prev.peerState(nodeId) != next.peerState(nodeId) ||
+               !Objects.equals(prev.getNodeAddresses(nodeId), 
next.getNodeAddresses(nodeId)) ||
+               !Objects.equals(prev.version(nodeId), next.version(nodeId));
+    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 0478500e97..25942368c8 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -284,7 +284,7 @@ public abstract class LocalLog implements Closeable
 
                     if (committed.compareAndSet(prev, next))
                     {
-                        logger.debug("Enacted {}. New tail is {}", 
pendingEntry.transform, next.epoch);
+                        logger.info("Enacted {}. New tail is {}", 
pendingEntry.transform, next.epoch);
                         maybeNotifyListeners(pendingEntry, transformed);
                     }
                     else


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to