Add beforeChange notification to IEndpointStateChangeSubscriber Patch by Sergio Bossa, reviewed by brandonwilliams for CASSANDRA-6135
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5eddf185 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5eddf185 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5eddf185 Branch: refs/heads/trunk Commit: 5eddf185465a0e2dde18df179faefb9a68fba29a Parents: 97cbf6a Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Oct 29 15:03:29 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Tue Oct 29 15:03:29 2013 -0500 ---------------------------------------------------------------------- NEWS.txt | 7 ++++++ src/java/org/apache/cassandra/gms/Gossiper.java | 24 ++++++++++++++++---- .../gms/IEndpointStateChangeSubscriber.java | 2 ++ .../apache/cassandra/gms/VersionedValue.java | 5 ++++ .../locator/ReconnectableSnitchHelper.java | 5 ++++ .../apache/cassandra/repair/RepairSession.java | 1 + .../cassandra/service/LoadBroadcaster.java | 2 ++ .../cassandra/service/MigrationManager.java | 3 +++ .../cassandra/service/StorageService.java | 7 +++++- .../cassandra/streaming/StreamSession.java | 1 + 10 files changed, 52 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 2489f26..8a5645d 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -13,6 +13,13 @@ restore snapshots created with the previous major version using the 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +2.0.3 +===== +Upgrading +--------- + - The IEndpointStateChangeSubscriber has a new method, beforeChange, that + any custom implemenations using the class will need to implement. + 2.0.2 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 07c21bd..8237fa3 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -44,7 +44,6 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; /** * This module is responsible for Gossiping information for the local endpoint. This abstraction @@ -964,12 +963,21 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) { - doNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); + doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); + } + } + + // notify that a local application state is going to change (doesn't get triggered for remote changes) + private void doBeforeChangeNotifications(InetAddress addr, EndpointState epState, ApplicationState apState, VersionedValue newValue) + { + for (IEndpointStateChangeSubscriber subscriber : subscribers) + { + subscriber.beforeChange(addr, epState, apState, newValue); } } // notify that an application state has changed - private void doNotifications(InetAddress addr, ApplicationState state, VersionedValue value) + private void doOnChangeNotifications(InetAddress addr, ApplicationState state, VersionedValue value) { for (IEndpointStateChangeSubscriber subscriber : subscribers) { @@ -1186,9 +1194,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void addLocalApplicationState(ApplicationState state, VersionedValue value) { EndpointState epState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); + InetAddress epAddr = FBUtilities.getBroadcastAddress(); assert epState != null; + // Fire "before change" notifications: + doBeforeChangeNotifications(epAddr, epState, state, value); + // Notifications may have taken some time, so preventively raise the version + // of the new value, otherwise it could be ignored by the remote node + // if another value with a newer version was received in the meantime: + value = StorageService.instance.valueFactory.cloneWithHigherVersion(value); + // Add to local application state and fire "on change" notifications: epState.addApplicationState(state, value); - doNotifications(FBUtilities.getBroadcastAddress(), state, value); + doOnChangeNotifications(epAddr, state, value); } public void stop() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java index dd7ee2c..1bfd678 100644 --- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java +++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java @@ -37,6 +37,8 @@ public interface IEndpointStateChangeSubscriber * @param epState state that actually changed for the above endpoint. */ public void onJoin(InetAddress endpoint, EndpointState epState); + + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue); public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 2bc3433..7c64a83 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -113,6 +113,11 @@ public class VersionedValue implements Comparable<VersionedValue> { this.partitioner = partitioner; } + + public VersionedValue cloneWithHigherVersion(VersionedValue value) + { + return new VersionedValue(value.value); + } public VersionedValue bootstrapping(Collection<Token> tokens) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java index e1353f4..d797393 100644 --- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java +++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java @@ -70,6 +70,11 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s", localAddress, publicAddress)); } } + + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) + { + // no-op + } public void onJoin(InetAddress endpoint, EndpointState epState) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 2e93104..18688f9 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -318,6 +318,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan } public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} public void onAlive(InetAddress endpoint, EndpointState state) {} public void onDead(InetAddress endpoint, EndpointState state) {} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/service/LoadBroadcaster.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java index 4a118ba..4996e52 100644 --- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java +++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java @@ -58,6 +58,8 @@ public class LoadBroadcaster implements IEndpointStateChangeSubscriber onChange(endpoint, ApplicationState.LOAD, localValue); } } + + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} public void onAlive(InetAddress endpoint, EndpointState state) {} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 131163b..0ffc7c4 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -78,6 +78,9 @@ public class MigrationManager implements IEndpointStateChangeSubscriber public void onJoin(InetAddress endpoint, EndpointState epState) {} + + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) + {} public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 95db711..c3f32fa 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -137,7 +137,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /* This abstraction maintains the token/endpoint metadata information */ private TokenMetadata tokenMetadata = new TokenMetadata(); - public VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner()); + public volatile VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner()); public static final StorageService instance = new StorageService(); @@ -1186,6 +1186,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return rangeToEndpointMap; } + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) + { + // no-op + } + /* * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 5a16d81..98a76fc 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -552,6 +552,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe } public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} public void onAlive(InetAddress endpoint, EndpointState state) {} public void onDead(InetAddress endpoint, EndpointState state) {}