Add beforeChange notification to IEndpointStateChangeSubscriber
Patch by Sergio Bossa, reviewed by brandonwilliams for CASSANDRA-6135


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5eddf185
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5eddf185
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5eddf185

Branch: refs/heads/trunk
Commit: 5eddf185465a0e2dde18df179faefb9a68fba29a
Parents: 97cbf6a
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Tue Oct 29 15:03:29 2013 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Tue Oct 29 15:03:29 2013 -0500

----------------------------------------------------------------------
 NEWS.txt                                        |  7 ++++++
 src/java/org/apache/cassandra/gms/Gossiper.java | 24 ++++++++++++++++----
 .../gms/IEndpointStateChangeSubscriber.java     |  2 ++
 .../apache/cassandra/gms/VersionedValue.java    |  5 ++++
 .../locator/ReconnectableSnitchHelper.java      |  5 ++++
 .../apache/cassandra/repair/RepairSession.java  |  1 +
 .../cassandra/service/LoadBroadcaster.java      |  2 ++
 .../cassandra/service/MigrationManager.java     |  3 +++
 .../cassandra/service/StorageService.java       |  7 +++++-
 .../cassandra/streaming/StreamSession.java      |  1 +
 10 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 2489f26..8a5645d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,13 @@ restore snapshots created with the previous major version 
using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+2.0.3
+=====
+Upgrading
+---------
+    - The IEndpointStateChangeSubscriber has a new method, beforeChange, that
+      any custom implemenations using the class will need to implement.
+
 
 2.0.2
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 07c21bd..8237fa3 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -44,7 +44,6 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 
 /**
  * This module is responsible for Gossiping information for the local 
endpoint. This abstraction
@@ -964,12 +963,21 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
         for (Entry<ApplicationState, VersionedValue> remoteEntry : 
remoteState.getApplicationStateMap().entrySet())
         {
-            doNotifications(addr, remoteEntry.getKey(), 
remoteEntry.getValue());
+            doOnChangeNotifications(addr, remoteEntry.getKey(), 
remoteEntry.getValue());
+        }
+    }
+    
+    // notify that a local application state is going to change (doesn't get 
triggered for remote changes)
+    private void doBeforeChangeNotifications(InetAddress addr, EndpointState 
epState, ApplicationState apState, VersionedValue newValue)
+    {
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+        {
+            subscriber.beforeChange(addr, epState, apState, newValue);
         }
     }
 
     // notify that an application state has changed
-    private void doNotifications(InetAddress addr, ApplicationState state, 
VersionedValue value)
+    private void doOnChangeNotifications(InetAddress addr, ApplicationState 
state, VersionedValue value)
     {
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
         {
@@ -1186,9 +1194,17 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     public void addLocalApplicationState(ApplicationState state, 
VersionedValue value)
     {
         EndpointState epState = 
endpointStateMap.get(FBUtilities.getBroadcastAddress());
+        InetAddress epAddr = FBUtilities.getBroadcastAddress();
         assert epState != null;
+        // Fire "before change" notifications:
+        doBeforeChangeNotifications(epAddr, epState, state, value);
+        // Notifications may have taken some time, so preventively raise the 
version
+        // of the new value, otherwise it could be ignored by the remote node
+        // if another value with a newer version was received in the meantime:
+        value = 
StorageService.instance.valueFactory.cloneWithHigherVersion(value);
+        // Add to local application state and fire "on change" notifications:
         epState.addApplicationState(state, value);
-        doNotifications(FBUtilities.getBroadcastAddress(), state, value);
+        doOnChangeNotifications(epAddr, state, value);
     }
 
     public void stop()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java 
b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
index dd7ee2c..1bfd678 100644
--- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
+++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
@@ -37,6 +37,8 @@ public interface IEndpointStateChangeSubscriber
      * @param epState  state that actually changed for the above endpoint.
      */
     public void onJoin(InetAddress endpoint, EndpointState epState);
+    
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue);
 
     public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 2bc3433..7c64a83 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -113,6 +113,11 @@ public class VersionedValue implements 
Comparable<VersionedValue>
         {
             this.partitioner = partitioner;
         }
+        
+        public VersionedValue cloneWithHigherVersion(VersionedValue value)
+        {
+            return new VersionedValue(value.value);
+        }
 
         public VersionedValue bootstrapping(Collection<Token> tokens)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java 
b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index e1353f4..d797393 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -70,6 +70,11 @@ public class ReconnectableSnitchHelper implements 
IEndpointStateChangeSubscriber
             logger.debug(String.format("Intiated reconnect to an Internal IP 
%s for the %s", localAddress, publicAddress));
         }
     }
+    
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue)
+    {
+        // no-op
+    }
 
     public void onJoin(InetAddress endpoint, EndpointState epState)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 2e93104..18688f9 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -318,6 +318,7 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
     }
 
     public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue) {}
     public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value) {}
     public void onAlive(InetAddress endpoint, EndpointState state) {}
     public void onDead(InetAddress endpoint, EndpointState state) {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/service/LoadBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java 
b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 4a118ba..4996e52 100644
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@ -58,6 +58,8 @@ public class LoadBroadcaster implements 
IEndpointStateChangeSubscriber
             onChange(endpoint, ApplicationState.LOAD, localValue);
         }
     }
+    
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue) {}
 
     public void onAlive(InetAddress endpoint, EndpointState state) {}
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java 
b/src/java/org/apache/cassandra/service/MigrationManager.java
index 131163b..0ffc7c4 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -78,6 +78,9 @@ public class MigrationManager implements 
IEndpointStateChangeSubscriber
 
     public void onJoin(InetAddress endpoint, EndpointState epState)
     {}
+    
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue) 
+    {}
 
     public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 95db711..c3f32fa 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -137,7 +137,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     /* This abstraction maintains the token/endpoint metadata information */
     private TokenMetadata tokenMetadata = new TokenMetadata();
 
-    public VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(getPartitioner());
+    public volatile VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(getPartitioner());
 
     public static final StorageService instance = new StorageService();
 
@@ -1186,6 +1186,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return rangeToEndpointMap;
     }
 
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue)
+    {
+        // no-op
+    }
+
     /*
      * Handle the reception of a new particular ApplicationState for a 
particular endpoint. Note that the value of the
      * ApplicationState has not necessarily "changed" since the last known 
value, if we already received the same update

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eddf185/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5a16d81..98a76fc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -552,6 +552,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
     }
 
     public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue) {}
     public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value) {}
     public void onAlive(InetAddress endpoint, EndpointState state) {}
     public void onDead(InetAddress endpoint, EndpointState state) {}

Reply via email to