Author: gdusbabek
Date: Mon Jan 31 16:12:57 2011
New Revision: 1065664
URL: http://svn.apache.org/viewvc?rev=1065664&view=rev
Log:
ignore messages from the future. keep track of nodes in gossip regardless.
patch by gdusbabek, reviewed by jbellis. CASSANDRA-1970
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1065664&r1=1065663&r2=1065664&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Jan 31 16:12:57 2011
@@ -49,7 +49,8 @@
* fix math in RandomPartitioner.describeOwnership (CASSANDRA-2071)
* fix deletion of sstable non-data components (CASSANDRA-2059)
* avoid blocking gossip while deleting handoff hints (CASSANDRA-2073)
-
+ * ignore messages from newer versions, keep track of nodes in gossip
+ regardless of version (CASSANDRA-1970)
0.7.0-final
* fix offsets to ByteBuffer.get (CASSANDRA-1939)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1065664&r1=1065663&r2=1065664&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
Mon Jan 31 16:12:57 2011
@@ -26,6 +26,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,6 +142,10 @@ public class Gossiper implements IFailur
* after removal to prevent nodes from falsely reincarnating during the
time when removal
* gossip gets propagated to all nodes */
Map<InetAddress, Long> justRemovedEndpoints_ = new
ConcurrentHashMap<InetAddress, Long>();
+
+ // protocol versions of the other nodes in the cluster
+ private final ConcurrentMap<InetAddress, Integer> versions = new
NonBlockingHashMap<InetAddress, Integer>();
+
private Gossiper()
{
@@ -169,6 +174,20 @@ public class Gossiper implements IFailur
{
subscribers_.remove(subscriber);
}
+
+ public void setVersion(InetAddress address, int version)
+ {
+ Integer old = versions.put(address, version);
+ EndpointState state = endpointStateMap_.get(address);
+ if (state == null)
+ addSavedEndpoint(address);
+ }
+
+ public Integer getVersion(InetAddress address)
+ {
+ return versions.get(address);
+ }
+
public Set<InetAddress> getLiveMembers()
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1065664&r1=1065663&r2=1065664&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Mon Jan 31 16:12:57 2011
@@ -24,6 +24,7 @@ package org.apache.cassandra.net;
import java.io.*;
import java.net.Socket;
+import org.apache.cassandra.gms.Gossiper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +53,7 @@ public class IncomingTcpConnection exten
{
DataInputStream input;
boolean isStream;
+ int version;
try
{
// determine the connection type to decide whether to buffer
@@ -62,6 +64,8 @@ public class IncomingTcpConnection exten
if (!isStream)
// we should buffer
input = new DataInputStream(new
BufferedInputStream(socket.getInputStream(), 4096));
+ version = MessagingService.getBits(header, 15, 8);
+ Gossiper.instance.setVersion(socket.getInetAddress(), version);
}
catch (IOException e)
{
@@ -74,6 +78,12 @@ public class IncomingTcpConnection exten
{
if (isStream)
{
+ if (version > MessagingService.version_)
+ {
+ logger.error("Received untranslated stream from newer
protcol version. Terminating connection!");
+ close();
+ return;
+ }
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
@@ -87,12 +97,18 @@ public class IncomingTcpConnection exten
byte[] contentBytes = new byte[size];
input.readFully(contentBytes);
- Message message = Message.serializer().deserialize(new
DataInputStream(new ByteArrayInputStream(contentBytes)));
- MessagingService.instance().receive(message);
+ if (version > MessagingService.version_)
+ logger.info("Received connection from newer protocol
version. Ignorning message.");
+ else
+ {
+ Message message = Message.serializer().deserialize(new
DataInputStream(new ByteArrayInputStream(contentBytes)));
+ MessagingService.instance().receive(message);
+ }
}
// prepare to read the next message
MessagingService.validateMagic(input.readInt());
int header = input.readInt();
+ version = MessagingService.getBits(header, 15, 8);
assert isStream == (MessagingService.getBits(header, 3, 1) ==
1) : "Connections cannot change type: " + isStream;
}
catch (EOFException e)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1065664&r1=1065663&r2=1065664&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Mon Jan 31 16:12:57 2011
@@ -61,7 +61,7 @@ import org.cliffc.high_scale_lib.NonBloc
public final class MessagingService implements MessagingServiceMBean
{
- private static final int version_ = 1;
+ public static final int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is
appropriate.
private SerializerType serializerType_ = SerializerType.BINARY;