Author: jbellis
Date: Thu Jun 24 15:58:16 2010
New Revision: 957593

URL: http://svn.apache.org/viewvc?rev=957593&view=rev
Log:
remove obsolete gossip size limit.  patch by Anthony Molinaro and jbellis for 
CASSANDRA-1138

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
    
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
    
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 24 15:58:16 2010
@@ -34,6 +34,7 @@ dev
    mutations (CASSANDRA-1179)
  * avoid allocating a new byte[] for each mutation on replay (CASSANDRA-1219)
  * revise HH schema to be per-endpoint (CASSANDRA-1142)
+ * remove gossip message size limit (CASSANDRA-1138)
 
 
 0.6.3

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Thu 
Jun 24 15:58:16 2010
@@ -145,9 +145,6 @@ class EndpointStateSerializer implements
     
     public void serialize(EndpointState epState, DataOutputStream dos) throws 
IOException
     {
-        /* These are for estimating whether we overshoot the MTU limit */
-        int estimate = 0;
-
         /* serialize the HeartBeatState */
         HeartBeatState hbState = epState.getHeartBeatState();
         HeartBeatState.serializer().serialize(hbState, dos);
@@ -155,26 +152,13 @@ class EndpointStateSerializer implements
         /* serialize the map of ApplicationState objects */
         int size = epState.applicationState_.size();
         dos.writeInt(size);
-        if ( size > 0 )
-        {   
-            Set<String> keys = epState.applicationState_.keySet();
-            for( String key : keys )
+        for (String key : epState.applicationState_.keySet())
+        {
+            ApplicationState appState = epState.applicationState_.get(key);
+            if (appState != null)
             {
-                if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-                {
-                    logger_.info("@@@@ Breaking out to respect the MTU size in 
EndpointState serializer. Estimate is {} @@@@", estimate);;
-                    break;
-                }
-            
-                ApplicationState appState = epState.applicationState_.get(key);
-                if ( appState != null )
-                {
-                    int pre = dos.size();
-                    dos.writeUTF(key);
-                    ApplicationState.serializer().serialize(appState, dos);    
                
-                    int post = dos.size();
-                    estimate = post - pre;
-                }                
+                dos.writeUTF(key);
+                ApplicationState.serializer().serialize(appState, dos);
             }
         }
     }
@@ -191,8 +175,8 @@ class EndpointStateSerializer implements
             {
                 break;
             }
-            
-            String key = dis.readUTF();    
+
+            String key = dis.readUTF();
             ApplicationState appState = 
ApplicationState.serializer().deserialize(dis);            
             epState.addApplicationState(key, appState);            
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java 
Thu Jun 24 15:58:16 2010
@@ -70,26 +70,16 @@ class GossipDigestAckMessageSerializer i
 {
     public void serialize(GossipDigestAckMessage gDigestAckMessage, 
DataOutputStream dos) throws IOException
     {
-        /* Use the helper to serialize the GossipDigestList */
-        boolean bContinue = 
GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
-        dos.writeBoolean(bContinue);
-        /* Use the EndpointState */
-        if ( bContinue )
-        {
-            
EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, 
dos);            
-        }
+        
GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+        dos.writeBoolean(true); // 0.6 compatibility
+        
EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
     }
 
     public GossipDigestAckMessage deserialize(DataInputStream dis) throws 
IOException
     {
-        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, 
EndpointState>();
-        List<GossipDigest> gDigestList = 
GossipDigestSerializationHelper.deserialize(dis);                
-        boolean bContinue = dis.readBoolean();
-
-        if ( bContinue )
-        {
-            epStateMap = EndpointStatesSerializationHelper.deserialize(dis);   
                                 
-        }
+        List<GossipDigest> gDigestList = 
GossipDigestSerializationHelper.deserialize(dis);
+        dis.readBoolean(); // 0.6 compatibility
+        Map<InetAddress, EndpointState> epStateMap = 
EndpointStatesSerializationHelper.deserialize(dis);
         return new GossipDigestAckMessage(gDigestList, epStateMap);
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java 
Thu Jun 24 15:58:16 2010
@@ -69,44 +69,24 @@ class GossipDigestSerializationHelper
 {
     private static Logger logger_ = 
LoggerFactory.getLogger(GossipDigestSerializationHelper.class);
     
-    static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream 
dos) throws IOException
+    static void serialize(List<GossipDigest> gDigestList, DataOutputStream 
dos) throws IOException
     {
-        boolean bVal = true;
-        int size = gDigestList.size();                        
-        dos.writeInt(size);
-        
-        int estimate = 0;            
+        dos.writeInt(gDigestList.size());
         for ( GossipDigest gDigest : gDigestList )
         {
-            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-            {
-                logger_.info("@@@@ Breaking out to respect the MTU size in GD 
@@@@");
-                bVal = false;
-                break;
-            }
-            int pre = dos.size();               
             GossipDigest.serializer().serialize( gDigest, dos );
-            int post = dos.size();
-            estimate = post - pre;
         }
-        return bVal;
     }
 
     static List<GossipDigest> deserialize(DataInputStream dis) throws 
IOException
     {
         int size = dis.readInt();            
-        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+        List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size);
         
         for ( int i = 0; i < size; ++i )
         {
-            if ( dis.available() == 0 )
-            {
-                logger_.info("Remaining bytes zero. Stopping deserialization 
of GossipDigests.");
-                break;
-            }
-                            
-            GossipDigest gDigest = GossipDigest.serializer().deserialize(dis); 
               
-            gDigests.add( gDigest );                
+            assert dis.available() > 0;
+            gDigests.add(GossipDigest.serializer().deserialize(dis));          
      
         }        
         return gDigests;
     }
@@ -116,49 +96,29 @@ class EndpointStatesSerializationHelper
 {
     private static final Logger logger_ = 
LoggerFactory.getLogger(EndpointStatesSerializationHelper.class);
 
-    static boolean serialize(Map<InetAddress, EndpointState> epStateMap, 
DataOutputStream dos) throws IOException
+    static void serialize(Map<InetAddress, EndpointState> epStateMap, 
DataOutputStream dos) throws IOException
     {
-        boolean bVal = true;
-        int estimate = 0;                
-        int size = epStateMap.size();
-        dos.writeInt(size);
-
+        dos.writeInt(epStateMap.size());
         for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
         {
             InetAddress ep = entry.getKey();
-            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-            {
-                logger_.info("@@@@ Breaking out to respect the MTU size in 
EPS. Estimate is " + estimate + " @@@@");
-                bVal = false;
-                break;
-            }
-    
-            int pre = dos.size();
             CompactEndpointSerializationHelper.serialize(ep, dos);
             EndpointState.serializer().serialize(entry.getValue(), dos);
-            int post = dos.size();
-            estimate = post - pre;
         }
-        return bVal;
     }
 
     static Map<InetAddress, EndpointState> deserialize(DataInputStream dis) 
throws IOException
     {
         int size = dis.readInt();            
-        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, 
EndpointState>();
-        
+        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, 
EndpointState>(size);
+
         for ( int i = 0; i < size; ++i )
         {
-            if ( dis.available() == 0 )
-            {
-                logger_.info("Remaining bytes zero. Stopping deserialization 
in EndpointState.");
-                break;
-            }
-            // int length = dis.readInt();            
+            assert dis.available() > 0;
             InetAddress ep = 
CompactEndpointSerializationHelper.deserialize(dis);
             EndpointState epState = 
EndpointState.serializer().deserialize(dis);
             epStateMap.put(ep, epState);
-        }        
+        }
         return epStateMap;
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=957593&r1=957592&r2=957593&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jun 24 
15:58:16 2010
@@ -102,7 +102,6 @@ public class Gossiper implements IFailur
         }
     }
 
-    final static int MAX_GOSSIP_PACKET_SIZE = 1428;
     public final static int intervalInMillis_ = 1000;
     private static Logger logger_ = LoggerFactory.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
@@ -287,7 +286,7 @@ public class Gossiper implements IFailur
     Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws 
IOException
     {
         GossipDigestSynMessage gDigestMessage = new 
GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
-        ByteArrayOutputStream bos = new 
ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
         return new Message(localEndpoint_, StageManager.GOSSIP_STAGE, 
StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
@@ -295,7 +294,7 @@ public class Gossiper implements IFailur
 
     Message makeGossipDigestAckMessage(GossipDigestAckMessage 
gDigestAckMessage) throws IOException
     {
-        ByteArrayOutputStream bos = new 
ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
         if (logger_.isTraceEnabled())
@@ -305,7 +304,7 @@ public class Gossiper implements IFailur
 
     Message makeGossipDigestAck2Message(GossipDigestAck2Message 
gDigestAck2Message) throws IOException
     {
-        ByteArrayOutputStream bos = new 
ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, 
dos);
         return new Message(localEndpoint_, StageManager.GOSSIP_STAGE, 
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());


Reply via email to