Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 d4e6f08d4 -> f3b7599e3 refs/heads/trunk db49d3b89 -> a800ca898
Avoid potential AssertionError in mixed version cluster patch by slebresne; reviewed by Stefania for CASSANDRA-1128 The patch attempts to make sure the version of a given node is set correctly as soon as possible by using the version passed through gossip, as that version could previously be used before having been properly set, thus defaulting to the current version (which might be incorrect) and leading to the AssertionError Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3b7599e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3b7599e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3b7599e Branch: refs/heads/cassandra-3.0 Commit: f3b7599e3b615f26cc81affa97569f6a7395cccc Parents: d4e6f08 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Feb 9 15:08:34 2016 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Feb 12 12:04:09 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/net/MessagingService.java | 3 +++ .../cassandra/net/OutboundTcpConnection.java | 11 +++++++++- .../cassandra/service/StorageService.java | 21 ++++++++++++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b7599e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5156b0c..15012b1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.4 + * Avoid potential AssertionError in mixed version cluster (CASSANDRA-11128) * Properly handle hinted handoff after topology changes (CASSANDRA-5902) * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156) * Fix wrong rack counting and invalid conditions check for TokenAllocation http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b7599e/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index d416dca..835beed 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -877,6 +877,9 @@ public final class MessagingService implements MessagingServiceMBean */ public int setVersion(InetAddress endpoint, int version) { + // We can't talk to someone from the future + version = Math.min(version, current_version); + logger.trace("Setting version {} for {}", version, endpoint); if (version < VERSION_22) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b7599e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index adf90da..7b6e26e 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -134,13 +134,22 @@ public class OutboundTcpConnection extends Thread private volatile long completed; private final AtomicLong dropped = new AtomicLong(); private volatile int currentMsgBufferCount = 0; - private int targetVersion = MessagingService.current_version; + private volatile int targetVersion; public OutboundTcpConnection(OutboundTcpConnectionPool pool) { super("MessagingService-Outgoing-" + pool.endPoint()); this.poolReference = pool; cs = newCoalescingStrategy(pool.endPoint().getHostAddress()); + + // We want to use the most precise version we know because while there is version detection on connect(), + // the target version might be accessed by the pool (in getConnection()) before we actually connect (as we + // connect when the first message is submitted). Note however that the only case where we'll connect + // without knowing the true version of a node is if that node is a seed (otherwise, we can't know a node + // unless it has been gossiped to us or it has connected to us and in both case this sets the version) and + // in that case we won't rely on that targetVersion before we're actually connected and so the version + // detection in connect() will do its job. + targetVersion = MessagingService.instance().getVersion(pool.endPoint()); } private static boolean isLocalDC(InetAddress targetHost) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b7599e/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 4cdeeb0..7cca516 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1774,11 +1774,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE case RPC_READY: notifyRpcChange(endpoint, epState.isRpcReady()); break; + case NET_VERSION: + updateNetVersion(endpoint, value); + break; } } } } + private void updateNetVersion(InetAddress endpoint, VersionedValue value) + { + try + { + MessagingService.instance().setVersion(endpoint, Integer.valueOf(value.value)); + } + catch (NumberFormatException e) + { + throw new AssertionError("Got invalid value for NET_VERSION application state: " + value.value); + } + } + public void updateTopology(InetAddress endpoint) { if (getTokenMetadata().isMember(endpoint)) @@ -2442,6 +2457,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // If we have restarted before the node was even marked down, we need to reset the connection pool if (state.isAlive()) onDead(endpoint, state); + + // Then, the node may have been upgraded and changed its messaging protocol version. If so, we + // want to update that before we mark the node live again to avoid problems like CASSANDRA-11128. + VersionedValue netVersion = state.getApplicationState(ApplicationState.NET_VERSION); + if (netVersion != null) + updateNetVersion(endpoint, netVersion); }