This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 12e38feec94e59d516ff751af992d631aac5c685 Author: Marcus Eriksson <[email protected]> AuthorDate: Mon Apr 17 09:07:46 2023 +0200 [CEP-21] Reinstate client notifications for joining/leaving/moving nodes patch by Marcus Eriksson; reviewed by Alex Petrov and Sam Tunnicliffe for CASSANDRA-18463 --- .../apache/cassandra/service/StorageService.java | 18 +++ .../tcm/listeners/ClientNotificationListener.java | 126 +++++++++++++++++++++ .../org/apache/cassandra/tcm/log/LocalLog.java | 2 + .../listeners/ClientNotificationListenerTest.java | 63 +++++++++++ 4 files changed, 209 insertions(+) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index b1e4e3093d..b6bd7edc52 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2133,6 +2133,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE subscriber.onDown(endpoint); } + public void notifyJoined(InetAddressAndPort endpoint) + { + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onJoinCluster(endpoint); + } + + public void notifyMoved(InetAddressAndPort endpoint) + { + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onMove(endpoint); + } + + public void notifyLeft(InetAddressAndPort endpoint) + { + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onLeaveCluster(endpoint); + } + public boolean isRpcReady(InetAddressAndPort endpoint) { EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); diff --git a/src/java/org/apache/cassandra/tcm/listeners/ClientNotificationListener.java b/src/java/org/apache/cassandra/tcm/listeners/ClientNotificationListener.java new file mode 100644 index 0000000000..58dbb4409c --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/listeners/ClientNotificationListener.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.listeners; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.tcm.listeners.ClientNotificationListener.ChangeType.LEAVE; +import static org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING; +import static org.apache.cassandra.tcm.membership.NodeState.MOVING; +import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED; + +public class ClientNotificationListener implements ChangeListener +{ + private static final Logger logger = LoggerFactory.getLogger(ClientNotificationListener.class); + + /** + * notify clients when node JOIN/MOVE/LEAVE + * + * note that we don't register any listeners in StorageService until starting the native protocol + * so we won't send any notifications during startup replay (todo: should we start native before doing the background catchup?) + */ + public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next) + { + List<Pair<NodeId, ChangeType>> diff = diff(prev.directory, next.directory); + logger.debug("Maybe notify listeners about {}", diff); + + for (Pair<NodeId, ChangeType> change : diff) + { + InetAddressAndPort endpoint = next.directory.endpoint(change.left); + switch (change.right) + { + case JOIN: + StorageService.instance.notifyJoined(endpoint); + break; + case MOVE: + StorageService.instance.notifyMoved(endpoint); + break; + case LEAVE: + StorageService.instance.notifyLeft(endpoint); + break; + } + } + } + + enum ChangeType + { + MOVE, + JOIN, + LEAVE + } + + private static List<Pair<NodeId, ChangeType>> diff(Directory prev, Directory next) + { + if (!prev.lastModified().equals(next.lastModified())) + { + List<Pair<NodeId, ChangeType>> changes = new ArrayList<>(); + for (NodeId node : next.peerIds()) + { + NodeState prevState = prev.peerState(node); + NodeState nextState = next.peerState(node); + ChangeType ct = fromNodeStateTransition(prevState, nextState); + if (ct != null) + changes.add(Pair.create(node, ct)); + } + return changes; + } + return Collections.emptyList(); + } + + static ChangeType fromNodeStateTransition(NodeState prev, NodeState next) + { + if (next == null) + return LEAVE; + if (prev == next) + return null; + switch (next) + { + case MOVING: + case LEAVING: + // if we see this NodeState but the previous state was null, it means we must have missed the JOINED state + if (prev == null || prev == BOOTSTRAPPING || prev == REGISTERED) + return ChangeType.JOIN; + return null; + case JOINED: + if (prev == MOVING) + return ChangeType.MOVE; + return ChangeType.JOIN; + case LEFT: + return LEAVE; + case BOOTSTRAPPING: + case REGISTERED: + return null; + default: + throw new IllegalStateException("Unknown NodeState: " + next); + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 931e581c63..1259d55008 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -44,6 +44,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.listeners.ChangeListener; +import org.apache.cassandra.tcm.listeners.ClientNotificationListener; import org.apache.cassandra.tcm.listeners.InitializationListener; import org.apache.cassandra.tcm.listeners.LegacyStateListener; import org.apache.cassandra.tcm.listeners.LogListener; @@ -572,6 +573,7 @@ public abstract class LocalLog implements Closeable addListener(new PlacementsChangeListener()); addListener(new MetadataSnapshotListener()); addListener(new PaxosRepairListener()); + addListener(new ClientNotificationListener()); } private LogListener snapshotListener() diff --git a/test/unit/org/apache/cassandra/tcm/listeners/ClientNotificationListenerTest.java b/test/unit/org/apache/cassandra/tcm/listeners/ClientNotificationListenerTest.java new file mode 100644 index 0000000000..fde53deca9 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/listeners/ClientNotificationListenerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.listeners; + +import org.junit.Test; + +import org.apache.cassandra.tcm.membership.NodeState; + +import static org.apache.cassandra.tcm.listeners.ClientNotificationListener.ChangeType.*; +import static org.apache.cassandra.tcm.listeners.ClientNotificationListener.fromNodeStateTransition; +import static org.apache.cassandra.tcm.membership.NodeState.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ClientNotificationListenerTest +{ + @Test + public void testTransitions() + { + assertEquals(fromNodeStateTransition(null, JOINED), JOIN); + assertEquals(fromNodeStateTransition(REGISTERED, JOINED), JOIN); + assertEquals(fromNodeStateTransition(BOOTSTRAPPING, JOINED), JOIN); + + for (NodeState ns : NodeState.values()) + { + if (ns == LEFT) + continue; + assertEquals(ns.toString(), fromNodeStateTransition(ns, LEFT), LEAVE); + } + + assertEquals(fromNodeStateTransition(null, LEFT), LEAVE); + + // no client notifications when leaving/moving start: + assertNull(fromNodeStateTransition(JOINED, LEAVING)); + assertNull(fromNodeStateTransition(JOINED, MOVING)); + + // no client notifications until JOINED + assertNull(fromNodeStateTransition(null, BOOTSTRAPPING)); + assertNull(fromNodeStateTransition(null, REGISTERED)); + + // if LEAVING/MOVING is the first state we see for a node, assume it was JOINED before; + assertEquals(fromNodeStateTransition(null, LEAVING), JOIN); + assertEquals(fromNodeStateTransition(null, MOVING), JOIN); + + assertEquals(fromNodeStateTransition(MOVING, JOINED), MOVE); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
