Author: jbellis
Date: Thu Jun 24 15:58:16 2010
New Revision: 957593
URL: http://svn.apache.org/viewvc?rev=957593&view=rev
Log:
remove obsolete gossip size limit. patch by Anthony Molinaro and jbellis for
CASSANDRA-1138
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 24 15:58:16 2010
@@ -34,6 +34,7 @@ dev
mutations (CASSANDRA-1179)
* avoid allocating a new byte[] for each mutation on replay (CASSANDRA-1219)
* revise HH schema to be per-endpoint (CASSANDRA-1142)
+ * remove gossip message size limit (CASSANDRA-1138)
0.6.3
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Thu
Jun 24 15:58:16 2010
@@ -145,9 +145,6 @@ class EndpointStateSerializer implements
public void serialize(EndpointState epState, DataOutputStream dos) throws
IOException
{
- /* These are for estimating whether we overshoot the MTU limit */
- int estimate = 0;
-
/* serialize the HeartBeatState */
HeartBeatState hbState = epState.getHeartBeatState();
HeartBeatState.serializer().serialize(hbState, dos);
@@ -155,26 +152,13 @@ class EndpointStateSerializer implements
/* serialize the map of ApplicationState objects */
int size = epState.applicationState_.size();
dos.writeInt(size);
- if ( size > 0 )
- {
- Set<String> keys = epState.applicationState_.keySet();
- for( String key : keys )
+ for (String key : epState.applicationState_.keySet())
+ {
+ ApplicationState appState = epState.applicationState_.get(key);
+ if (appState != null)
{
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in
EndpointState serializer. Estimate is {} @@@@", estimate);;
- break;
- }
-
- ApplicationState appState = epState.applicationState_.get(key);
- if ( appState != null )
- {
- int pre = dos.size();
- dos.writeUTF(key);
- ApplicationState.serializer().serialize(appState, dos);
- int post = dos.size();
- estimate = post - pre;
- }
+ dos.writeUTF(key);
+ ApplicationState.serializer().serialize(appState, dos);
}
}
}
@@ -191,8 +175,8 @@ class EndpointStateSerializer implements
{
break;
}
-
- String key = dis.readUTF();
+
+ String key = dis.readUTF();
ApplicationState appState =
ApplicationState.serializer().deserialize(dis);
epState.addApplicationState(key, appState);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
Thu Jun 24 15:58:16 2010
@@ -70,26 +70,16 @@ class GossipDigestAckMessageSerializer i
{
public void serialize(GossipDigestAckMessage gDigestAckMessage,
DataOutputStream dos) throws IOException
{
- /* Use the helper to serialize the GossipDigestList */
- boolean bContinue =
GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
- dos.writeBoolean(bContinue);
- /* Use the EndpointState */
- if ( bContinue )
- {
-
EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_,
dos);
- }
+
GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+ dos.writeBoolean(true); // 0.6 compatibility
+
EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
}
public GossipDigestAckMessage deserialize(DataInputStream dis) throws
IOException
{
- Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress,
EndpointState>();
- List<GossipDigest> gDigestList =
GossipDigestSerializationHelper.deserialize(dis);
- boolean bContinue = dis.readBoolean();
-
- if ( bContinue )
- {
- epStateMap = EndpointStatesSerializationHelper.deserialize(dis);
- }
+ List<GossipDigest> gDigestList =
GossipDigestSerializationHelper.deserialize(dis);
+ dis.readBoolean(); // 0.6 compatibility
+ Map<InetAddress, EndpointState> epStateMap =
EndpointStatesSerializationHelper.deserialize(dis);
return new GossipDigestAckMessage(gDigestList, epStateMap);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
Thu Jun 24 15:58:16 2010
@@ -69,44 +69,24 @@ class GossipDigestSerializationHelper
{
private static Logger logger_ =
LoggerFactory.getLogger(GossipDigestSerializationHelper.class);
- static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream
dos) throws IOException
+ static void serialize(List<GossipDigest> gDigestList, DataOutputStream
dos) throws IOException
{
- boolean bVal = true;
- int size = gDigestList.size();
- dos.writeInt(size);
-
- int estimate = 0;
+ dos.writeInt(gDigestList.size());
for ( GossipDigest gDigest : gDigestList )
{
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in GD
@@@@");
- bVal = false;
- break;
- }
- int pre = dos.size();
GossipDigest.serializer().serialize( gDigest, dos );
- int post = dos.size();
- estimate = post - pre;
}
- return bVal;
}
static List<GossipDigest> deserialize(DataInputStream dis) throws
IOException
{
int size = dis.readInt();
- List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size);
for ( int i = 0; i < size; ++i )
{
- if ( dis.available() == 0 )
- {
- logger_.info("Remaining bytes zero. Stopping deserialization
of GossipDigests.");
- break;
- }
-
- GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);
- gDigests.add( gDigest );
+ assert dis.available() > 0;
+ gDigests.add(GossipDigest.serializer().deserialize(dis));
}
return gDigests;
}
@@ -116,49 +96,29 @@ class EndpointStatesSerializationHelper
{
private static final Logger logger_ =
LoggerFactory.getLogger(EndpointStatesSerializationHelper.class);
- static boolean serialize(Map<InetAddress, EndpointState> epStateMap,
DataOutputStream dos) throws IOException
+ static void serialize(Map<InetAddress, EndpointState> epStateMap,
DataOutputStream dos) throws IOException
{
- boolean bVal = true;
- int estimate = 0;
- int size = epStateMap.size();
- dos.writeInt(size);
-
+ dos.writeInt(epStateMap.size());
for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
{
InetAddress ep = entry.getKey();
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in
EPS. Estimate is " + estimate + " @@@@");
- bVal = false;
- break;
- }
-
- int pre = dos.size();
CompactEndpointSerializationHelper.serialize(ep, dos);
EndpointState.serializer().serialize(entry.getValue(), dos);
- int post = dos.size();
- estimate = post - pre;
}
- return bVal;
}
static Map<InetAddress, EndpointState> deserialize(DataInputStream dis)
throws IOException
{
int size = dis.readInt();
- Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress,
EndpointState>();
-
+ Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress,
EndpointState>(size);
+
for ( int i = 0; i < size; ++i )
{
- if ( dis.available() == 0 )
- {
- logger_.info("Remaining bytes zero. Stopping deserialization
in EndpointState.");
- break;
- }
- // int length = dis.readInt();
+ assert dis.available() > 0;
InetAddress ep =
CompactEndpointSerializationHelper.deserialize(dis);
EndpointState epState =
EndpointState.serializer().deserialize(dis);
epStateMap.put(ep, epState);
- }
+ }
return epStateMap;
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jun 24
15:58:16 2010
@@ -102,7 +102,6 @@ public class Gossiper implements IFailur
}
}
- final static int MAX_GOSSIP_PACKET_SIZE = 1428;
public final static int intervalInMillis_ = 1000;
private static Logger logger_ = LoggerFactory.getLogger(Gossiper.class);
public static final Gossiper instance = new Gossiper();
@@ -287,7 +286,7 @@ public class Gossiper implements IFailur
Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws
IOException
{
GossipDigestSynMessage gDigestMessage = new
GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
- ByteArrayOutputStream bos = new
ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
return new Message(localEndpoint_, StageManager.GOSSIP_STAGE,
StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
@@ -295,7 +294,7 @@ public class Gossiper implements IFailur
Message makeGossipDigestAckMessage(GossipDigestAckMessage
gDigestAckMessage) throws IOException
{
- ByteArrayOutputStream bos = new
ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
if (logger_.isTraceEnabled())
@@ -305,7 +304,7 @@ public class Gossiper implements IFailur
Message makeGossipDigestAck2Message(GossipDigestAck2Message
gDigestAck2Message) throws IOException
{
- ByteArrayOutputStream bos = new
ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message,
dos);
return new Message(localEndpoint_, StageManager.GOSSIP_STAGE,
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());