This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 12e38feec94e59d516ff751af992d631aac5c685
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Apr 17 09:07:46 2023 +0200

    [CEP-21] Reinstate client notifications for joining/leaving/moving nodes
    
    patch by Marcus Eriksson; reviewed by Alex Petrov and Sam Tunnicliffe
    for CASSANDRA-18463
---
 .../apache/cassandra/service/StorageService.java   |  18 +++
 .../tcm/listeners/ClientNotificationListener.java  | 126 +++++++++++++++++++++
 .../org/apache/cassandra/tcm/log/LocalLog.java     |   2 +
 .../listeners/ClientNotificationListenerTest.java  |  63 +++++++++++
 4 files changed, 209 insertions(+)

diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index b1e4e3093d..b6bd7edc52 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2133,6 +2133,24 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             subscriber.onDown(endpoint);
     }
 
+    public void notifyJoined(InetAddressAndPort endpoint)
+    {
+        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+            subscriber.onJoinCluster(endpoint);
+    }
+
+    public void notifyMoved(InetAddressAndPort endpoint)
+    {
+        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+            subscriber.onMove(endpoint);
+    }
+
+    public void notifyLeft(InetAddressAndPort endpoint)
+    {
+        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+            subscriber.onLeaveCluster(endpoint);
+    }
+
     public boolean isRpcReady(InetAddressAndPort endpoint)
     {
         EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
diff --git 
a/src/java/org/apache/cassandra/tcm/listeners/ClientNotificationListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/ClientNotificationListener.java
new file mode 100644
index 0000000000..58dbb4409c
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/tcm/listeners/ClientNotificationListener.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.listeners;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.utils.Pair;
+
+import static 
org.apache.cassandra.tcm.listeners.ClientNotificationListener.ChangeType.LEAVE;
+import static org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING;
+import static org.apache.cassandra.tcm.membership.NodeState.MOVING;
+import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED;
+
+public class ClientNotificationListener implements ChangeListener
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(ClientNotificationListener.class);
+
+    /**
+     * notify clients when node JOIN/MOVE/LEAVE
+     *
+     * note that we don't register any listeners in StorageService until 
starting the native protocol
+     * so we won't send any notifications during startup replay (todo: should 
we start native before doing the background catchup?)
+     */
+    public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next)
+    {
+        List<Pair<NodeId, ChangeType>> diff = diff(prev.directory, 
next.directory);
+        logger.debug("Maybe notify listeners about {}", diff);
+
+        for (Pair<NodeId, ChangeType> change : diff)
+        {
+            InetAddressAndPort endpoint = next.directory.endpoint(change.left);
+            switch (change.right)
+            {
+                case JOIN:
+                    StorageService.instance.notifyJoined(endpoint);
+                    break;
+                case MOVE:
+                    StorageService.instance.notifyMoved(endpoint);
+                    break;
+                case LEAVE:
+                    StorageService.instance.notifyLeft(endpoint);
+                    break;
+            }
+        }
+    }
+
+    enum ChangeType
+    {
+        MOVE,
+        JOIN,
+        LEAVE
+    }
+
+    private static List<Pair<NodeId, ChangeType>> diff(Directory prev, 
Directory next)
+    {
+        if (!prev.lastModified().equals(next.lastModified()))
+        {
+            List<Pair<NodeId, ChangeType>> changes = new ArrayList<>();
+            for (NodeId node : next.peerIds())
+            {
+                NodeState prevState = prev.peerState(node);
+                NodeState nextState = next.peerState(node);
+                ChangeType ct = fromNodeStateTransition(prevState, nextState);
+                if (ct != null)
+                    changes.add(Pair.create(node, ct));
+            }
+            return changes;
+        }
+        return Collections.emptyList();
+    }
+
+    static ChangeType fromNodeStateTransition(NodeState prev, NodeState next)
+    {
+        if (next == null)
+            return LEAVE;
+        if (prev == next)
+            return null;
+        switch (next)
+        {
+            case MOVING:
+            case LEAVING:
+                // if we see this NodeState but the previous state was null, 
it means we must have missed the JOINED state
+                if (prev == null || prev == BOOTSTRAPPING || prev == 
REGISTERED)
+                    return ChangeType.JOIN;
+                return null;
+            case JOINED:
+                if (prev == MOVING)
+                    return ChangeType.MOVE;
+                return ChangeType.JOIN;
+            case LEFT:
+                return LEAVE;
+            case BOOTSTRAPPING:
+            case REGISTERED:
+                return null;
+            default:
+                throw new IllegalStateException("Unknown NodeState: " + next);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 931e581c63..1259d55008 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.listeners.ChangeListener;
+import org.apache.cassandra.tcm.listeners.ClientNotificationListener;
 import org.apache.cassandra.tcm.listeners.InitializationListener;
 import org.apache.cassandra.tcm.listeners.LegacyStateListener;
 import org.apache.cassandra.tcm.listeners.LogListener;
@@ -572,6 +573,7 @@ public abstract class LocalLog implements Closeable
         addListener(new PlacementsChangeListener());
         addListener(new MetadataSnapshotListener());
         addListener(new PaxosRepairListener());
+        addListener(new ClientNotificationListener());
     }
 
     private LogListener snapshotListener()
diff --git 
a/test/unit/org/apache/cassandra/tcm/listeners/ClientNotificationListenerTest.java
 
b/test/unit/org/apache/cassandra/tcm/listeners/ClientNotificationListenerTest.java
new file mode 100644
index 0000000000..fde53deca9
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/tcm/listeners/ClientNotificationListenerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.listeners;
+
+import org.junit.Test;
+
+import org.apache.cassandra.tcm.membership.NodeState;
+
+import static 
org.apache.cassandra.tcm.listeners.ClientNotificationListener.ChangeType.*;
+import static 
org.apache.cassandra.tcm.listeners.ClientNotificationListener.fromNodeStateTransition;
+import static org.apache.cassandra.tcm.membership.NodeState.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ClientNotificationListenerTest
+{
+    @Test
+    public void testTransitions()
+    {
+        assertEquals(fromNodeStateTransition(null, JOINED), JOIN);
+        assertEquals(fromNodeStateTransition(REGISTERED, JOINED), JOIN);
+        assertEquals(fromNodeStateTransition(BOOTSTRAPPING, JOINED), JOIN);
+
+        for (NodeState ns : NodeState.values())
+        {
+            if (ns == LEFT)
+                continue;
+            assertEquals(ns.toString(), fromNodeStateTransition(ns, LEFT), 
LEAVE);
+        }
+
+        assertEquals(fromNodeStateTransition(null, LEFT), LEAVE);
+
+        // no client notifications when leaving/moving start:
+        assertNull(fromNodeStateTransition(JOINED, LEAVING));
+        assertNull(fromNodeStateTransition(JOINED, MOVING));
+
+        // no client notifications until JOINED
+        assertNull(fromNodeStateTransition(null, BOOTSTRAPPING));
+        assertNull(fromNodeStateTransition(null, REGISTERED));
+
+        // if LEAVING/MOVING is the first state we see for a node, assume it 
was JOINED before;
+        assertEquals(fromNodeStateTransition(null, LEAVING), JOIN);
+        assertEquals(fromNodeStateTransition(null, MOVING), JOIN);
+
+        assertEquals(fromNodeStateTransition(MOVING, JOINED), MOVE);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to