Author: brandonwilliams Date: Thu Jul 22 20:32:28 2010 New Revision: 966846
URL: http://svn.apache.org/viewvc?rev=966846&view=rev Log: remove obsolete gossip size limit. patch by Anthony Molinaro and jbellis for CASSANDRA-1138 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java?rev=966846&r1=966845&r2=966846&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java Thu Jul 22 20:32:28 2010 @@ -160,9 +160,6 @@ class EndPointStateSerializer implements public void serialize(EndPointState epState, DataOutputStream dos) throws IOException { - /* These are for estimating whether we overshoot the MTU limit */ - int estimate = 0; - /* serialize the HeartBeatState */ HeartBeatState hbState = epState.getHeartBeatState(); HeartBeatState.serializer().serialize(hbState, dos); @@ -170,26 +167,13 @@ class EndPointStateSerializer implements /* serialize the map of ApplicationState objects */ int size = epState.applicationState_.size(); dos.writeInt(size); - if ( size > 0 ) - { - Set<String> keys = epState.applicationState_.keySet(); - for( String key : keys ) + for (String key : epState.applicationState_.keySet()) + { + ApplicationState appState = epState.applicationState_.get(key); + if (appState != null) { - if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate ) - { - logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@"); - break; - } - - ApplicationState appState = epState.applicationState_.get(key); - if ( appState != null ) - { - int pre = dos.size(); - dos.writeUTF(key); - ApplicationState.serializer().serialize(appState, dos); - int post = dos.size(); - estimate = post - pre; - } + dos.writeUTF(key); + ApplicationState.serializer().serialize(appState, dos); } } } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=966846&r1=966845&r2=966846&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Thu Jul 22 20:32:28 2010 @@ -75,26 +75,16 @@ class GossipDigestAckMessageSerializer i { public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException { - /* Use the helper to serialize the GossipDigestList */ - boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos); - dos.writeBoolean(bContinue); - /* Use the EndPointState */ - if ( bContinue ) - { - EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos); - } + GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos); + dos.writeBoolean(true); // 0.6 compatibility + EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos); } public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException { - Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>(); - List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis); - boolean bContinue = dis.readBoolean(); - - if ( bContinue ) - { - epStateMap = EndPointStatesSerializationHelper.deserialize(dis); - } + List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis); + dis.readBoolean(); // 0.6 compatibility + Map<InetAddress, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis); return new GossipDigestAckMessage(gDigestList, epStateMap); } } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=966846&r1=966845&r2=966846&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Thu Jul 22 20:32:28 2010 @@ -67,44 +67,24 @@ class GossipDigestSerializationHelper { private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class); - static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException + static void serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException { - boolean bVal = true; - int size = gDigestList.size(); - dos.writeInt(size); - - int estimate = 0; + dos.writeInt(gDigestList.size()); for ( GossipDigest gDigest : gDigestList ) { - if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate ) - { - logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@"); - bVal = false; - break; - } - int pre = dos.size(); GossipDigest.serializer().serialize( gDigest, dos ); - int post = dos.size(); - estimate = post - pre; } - return bVal; } static List<GossipDigest> deserialize(DataInputStream dis) throws IOException { int size = dis.readInt(); - List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); + List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size); for ( int i = 0; i < size; ++i ) { - if ( dis.available() == 0 ) - { - logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests."); - break; - } - - GossipDigest gDigest = GossipDigest.serializer().deserialize(dis); - gDigests.add( gDigest ); + assert dis.available() > 0; + gDigests.add(GossipDigest.serializer().deserialize(dis)); } return gDigests; } @@ -114,45 +94,25 @@ class EndPointStatesSerializationHelper { private static final Logger logger_ = Logger.getLogger(EndPointStatesSerializationHelper.class); - static boolean serialize(Map<InetAddress, EndPointState> epStateMap, DataOutputStream dos) throws IOException + static void serialize(Map<InetAddress, EndPointState> epStateMap, DataOutputStream dos) throws IOException { - boolean bVal = true; - int estimate = 0; - int size = epStateMap.size(); - dos.writeInt(size); - + dos.writeInt(epStateMap.size()); for (Entry<InetAddress, EndPointState> entry : epStateMap.entrySet()) { InetAddress ep = entry.getKey(); - if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate ) - { - logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@"); - bVal = false; - break; - } - - int pre = dos.size(); CompactEndPointSerializationHelper.serialize(ep, dos); EndPointState.serializer().serialize(entry.getValue(), dos); - int post = dos.size(); - estimate = post - pre; } - return bVal; } static Map<InetAddress, EndPointState> deserialize(DataInputStream dis) throws IOException { int size = dis.readInt(); - Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>(); + Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>(size); for ( int i = 0; i < size; ++i ) { - if ( dis.available() == 0 ) - { - logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState."); - break; - } - // int length = dis.readInt(); + assert dis.available() > 0; InetAddress ep = CompactEndPointSerializationHelper.deserialize(dis); EndPointState epState = EndPointState.serializer().deserialize(dis); epStateMap.put(ep, epState); Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java?rev=966846&r1=966845&r2=966846&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jul 22 20:32:28 2010 @@ -101,7 +101,6 @@ public class Gossiper implements IFailur } } - final static int MAX_GOSSIP_PACKET_SIZE = 1428; public final static int intervalInMillis_ = 1000; private static Logger logger_ = Logger.getLogger(Gossiper.class); public static final Gossiper instance = new Gossiper(); @@ -280,7 +279,7 @@ public class Gossiper implements IFailur Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException { GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests); - ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream( bos ); GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos); return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray()); @@ -288,7 +287,7 @@ public class Gossiper implements IFailur Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos); if (logger_.isTraceEnabled()) @@ -298,7 +297,7 @@ public class Gossiper implements IFailur Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos); return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
