Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 d4e6f08d4 -> f3b7599e3
  refs/heads/trunk db49d3b89 -> a800ca898


Avoid potential AssertionError in mixed version cluster

patch by slebresne; reviewed by Stefania for CASSANDRA-1128

The patch attempts to make sure the version of a given node is set
correctly as soon as possible by using the version passed through
gossip, as that version could previously be used before having been
properly set, thus defaulting to the current version (which might be
incorrect) and leading to the AssertionError


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3b7599e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3b7599e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3b7599e

Branch: refs/heads/cassandra-3.0
Commit: f3b7599e3b615f26cc81affa97569f6a7395cccc
Parents: d4e6f08
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Tue Feb 9 15:08:34 2016 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Feb 12 12:04:09 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  |  3 +++
 .../cassandra/net/OutboundTcpConnection.java    | 11 +++++++++-
 .../cassandra/service/StorageService.java       | 21 ++++++++++++++++++++
 4 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b7599e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5156b0c..15012b1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.4
+ * Avoid potential AssertionError in mixed version cluster (CASSANDRA-11128)
  * Properly handle hinted handoff after topology changes (CASSANDRA-5902)
  * AssertionError when listing sstable files on inconsistent disk state 
(CASSANDRA-11156)
  * Fix wrong rack counting and invalid conditions check for TokenAllocation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b7599e/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index d416dca..835beed 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -877,6 +877,9 @@ public final class MessagingService implements 
MessagingServiceMBean
      */
     public int setVersion(InetAddress endpoint, int version)
     {
+        // We can't talk to someone from the future
+        version = Math.min(version, current_version);
+
         logger.trace("Setting version {} for {}", version, endpoint);
 
         if (version < VERSION_22)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b7599e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index adf90da..7b6e26e 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -134,13 +134,22 @@ public class OutboundTcpConnection extends Thread
     private volatile long completed;
     private final AtomicLong dropped = new AtomicLong();
     private volatile int currentMsgBufferCount = 0;
-    private int targetVersion = MessagingService.current_version;
+    private volatile int targetVersion;
 
     public OutboundTcpConnection(OutboundTcpConnectionPool pool)
     {
         super("MessagingService-Outgoing-" + pool.endPoint());
         this.poolReference = pool;
         cs = newCoalescingStrategy(pool.endPoint().getHostAddress());
+
+        // We want to use the most precise version we know because while there 
is version detection on connect(),
+        // the target version might be accessed by the pool (in 
getConnection()) before we actually connect (as we
+        // connect when the first message is submitted). Note however that the 
only case where we'll connect
+        // without knowing the true version of a node is if that node is a 
seed (otherwise, we can't know a node
+        // unless it has been gossiped to us or it has connected to us and in 
both case this sets the version) and
+        // in that case we won't rely on that targetVersion before we're 
actually connected and so the version
+        // detection in connect() will do its job.
+        targetVersion = 
MessagingService.instance().getVersion(pool.endPoint());
     }
 
     private static boolean isLocalDC(InetAddress targetHost)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b7599e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 4cdeeb0..7cca516 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1774,11 +1774,26 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     case RPC_READY:
                         notifyRpcChange(endpoint, epState.isRpcReady());
                         break;
+                    case NET_VERSION:
+                        updateNetVersion(endpoint, value);
+                        break;
                 }
             }
         }
     }
 
+    private void updateNetVersion(InetAddress endpoint, VersionedValue value)
+    {
+        try
+        {
+            MessagingService.instance().setVersion(endpoint, 
Integer.valueOf(value.value));
+        }
+        catch (NumberFormatException e)
+        {
+            throw new AssertionError("Got invalid value for NET_VERSION 
application state: " + value.value);
+        }
+    }
+
     public void updateTopology(InetAddress endpoint)
     {
         if (getTokenMetadata().isMember(endpoint))
@@ -2442,6 +2457,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         // If we have restarted before the node was even marked down, we need 
to reset the connection pool
         if (state.isAlive())
             onDead(endpoint, state);
+
+        // Then, the node may have been upgraded and changed its messaging 
protocol version. If so, we
+        // want to update that before we mark the node live again to avoid 
problems like CASSANDRA-11128.
+        VersionedValue netVersion = 
state.getApplicationState(ApplicationState.NET_VERSION);
+        if (netVersion != null)
+            updateNetVersion(endpoint, netVersion);
     }
 
 

Reply via email to