Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Thu Jul 30 15:30:21 2009 @@ -1,319 +1,319 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.gms; - -import java.io.FileOutputStream; -import java.lang.management.ManagementFactory; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.apache.commons.lang.StringUtils; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.EndPoint; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.LogUtil; -import org.apache.cassandra.utils.BoundedStatsDeque; -import org.apache.log4j.Logger; - -/** - * This FailureDetector is an implementation of the paper titled - * "The Phi Accrual Failure Detector" by Hayashibara. - * Check the paper and the <i>IFailureDetector</i> interface for details. - * - * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) - */ -public class FailureDetector implements IFailureDetector, FailureDetectorMBean -{ - private static Logger logger_ = Logger.getLogger(FailureDetector.class); - private static final int sampleSize_ = 1000; - private static final int phiSuspectThreshold_ = 5; - private static final int phiConvictThreshold_ = 8; - /* The Failure Detector has to have been up for at least 1 min. */ - private static final long uptimeThreshold_ = 60000; - private static IFailureDetector failureDetector_; - /* Used to lock the factory for creation of FailureDetector instance */ - private static Lock createLock_ = new ReentrantLock(); - /* The time when the module was instantiated. */ - private static long creationTime_; - - public static IFailureDetector instance() - { - if ( failureDetector_ == null ) - { - FailureDetector.createLock_.lock(); - try - { - if ( failureDetector_ == null ) - { - failureDetector_ = new FailureDetector(); - } - } - finally - { - createLock_.unlock(); - } - } - return failureDetector_; - } - - private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>(); - private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>(); - - public FailureDetector() - { - creationTime_ = System.currentTimeMillis(); - // Register this instance with JMX - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, new ObjectName("org.apache.cassandra.gms:type=FailureDetector")); - } - catch (Exception e) - { - logger_.error(LogUtil.throwableToString(e)); - } - } - - /** - * Dump the inter arrival times for examination if necessary. - */ - public void dumpInterArrivalTimes() - { - try - { - FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true); - fos.write(toString().getBytes()); - fos.close(); - } - catch(Throwable th) - { - logger_.warn(LogUtil.throwableToString(th)); - } - } - - /** - * We dump the arrival window for any endpoint only if the - * local Failure Detector module has been up for more than a - * minute. - * - * @param ep for which the arrival window needs to be dumped. - */ - private void dumpInterArrivalTimes(EndPoint ep) - { - long now = System.currentTimeMillis(); - if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ ) - return; - try - { - FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true); - ArrivalWindow hWnd = arrivalSamples_.get(ep); - fos.write(hWnd.toString().getBytes()); - fos.close(); - } - catch(Throwable th) - { - logger_.warn(LogUtil.throwableToString(th)); - } - } - - public boolean isAlive(EndPoint ep) - { - try - { - /* If the endpoint in question is the local endpoint return true. */ - String localHost = FBUtilities.getHostAddress(); - if ( localHost.equals( ep.getHost() ) ) - return true; - } - catch( UnknownHostException ex ) - { - logger_.info( LogUtil.throwableToString(ex) ); - } - /* Incoming port is assumed to be the Storage port. We need to change it to the control port */ - EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort()); - EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2); - return epState.isAlive(); - } - - public void report(EndPoint ep) - { - if (logger_.isTraceEnabled()) - logger_.trace("reporting " + ep); - long now = System.currentTimeMillis(); - ArrivalWindow heartbeatWindow = arrivalSamples_.get(ep); - if ( heartbeatWindow == null ) - { - heartbeatWindow = new ArrivalWindow(sampleSize_); - arrivalSamples_.put(ep, heartbeatWindow); - } - heartbeatWindow.add(now); - } - - public void interpret(EndPoint ep) - { - ArrivalWindow hbWnd = arrivalSamples_.get(ep); - if ( hbWnd == null ) - { - return; - } - long now = System.currentTimeMillis(); - /* We need this so that we do not suspect a convict. */ - boolean isConvicted = false; - double phi = hbWnd.phi(now); - if (logger_.isTraceEnabled()) - logger_.trace("PHI for " + ep + " : " + phi); - - /* - if ( phi > phiConvictThreshold_ ) - { - isConvicted = true; - for ( IFailureDetectionEventListener listener : fdEvntListeners_ ) - { - listener.convict(ep); - } - } - */ - if ( !isConvicted && phi > phiSuspectThreshold_ ) - { - for ( IFailureDetectionEventListener listener : fdEvntListeners_ ) - { - listener.suspect(ep); - } - } - } - - public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) - { - fdEvntListeners_.add(listener); - } - - public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) - { - fdEvntListeners_.remove(listener); - } - - public String toString() - { - StringBuilder sb = new StringBuilder(); - Set<EndPoint> eps = arrivalSamples_.keySet(); - - sb.append("-----------------------------------------------------------------------"); - for ( EndPoint ep : eps ) - { - ArrivalWindow hWnd = arrivalSamples_.get(ep); - sb.append(ep + " : "); - sb.append(hWnd.toString()); - sb.append( System.getProperty("line.separator") ); - } - sb.append("-----------------------------------------------------------------------"); - return sb.toString(); - } - - public static void main(String[] args) throws Throwable - { - } -} - -class ArrivalWindow -{ - private static Logger logger_ = Logger.getLogger(ArrivalWindow.class); - private double tLast_ = 0L; - private BoundedStatsDeque arrivalIntervals_; - - ArrivalWindow(int size) - { - arrivalIntervals_ = new BoundedStatsDeque(size); - } - - synchronized void add(double value) - { - double interArrivalTime; - if ( tLast_ > 0L ) - { - interArrivalTime = (value - tLast_); - } - else - { - interArrivalTime = Gossiper.intervalInMillis_ / 2; - } - tLast_ = value; - arrivalIntervals_.add(interArrivalTime); - } - - synchronized double sum() - { - return arrivalIntervals_.sum(); - } - - synchronized double sumOfDeviations() - { - return arrivalIntervals_.sumOfDeviations(); - } - - synchronized double mean() - { - return arrivalIntervals_.mean(); - } - - synchronized double variance() - { - return arrivalIntervals_.variance(); - } - - double stdev() - { - return arrivalIntervals_.stdev(); - } - - void clear() - { - arrivalIntervals_.clear(); - } - - double p(double t) - { - double mean = mean(); - double exponent = (-1)*(t)/mean; - return 1 - ( 1 - Math.pow(Math.E, exponent) ); - } - - double phi(long tnow) - { - int size = arrivalIntervals_.size(); - double log = 0d; - if ( size > 0 ) - { - double t = tnow - tLast_; - double probability = p(t); - log = (-1) * Math.log10( probability ); - } - return log; - } - - public String toString() - { - return StringUtils.join(arrivalIntervals_.iterator(), " "); - } -} - +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.io.FileOutputStream; +import java.lang.management.ManagementFactory; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.commons.lang.StringUtils; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.EndPoint; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.LogUtil; +import org.apache.cassandra.utils.BoundedStatsDeque; +import org.apache.log4j.Logger; + +/** + * This FailureDetector is an implementation of the paper titled + * "The Phi Accrual Failure Detector" by Hayashibara. + * Check the paper and the <i>IFailureDetector</i> interface for details. + * + * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) + */ +public class FailureDetector implements IFailureDetector, FailureDetectorMBean +{ + private static Logger logger_ = Logger.getLogger(FailureDetector.class); + private static final int sampleSize_ = 1000; + private static final int phiSuspectThreshold_ = 5; + private static final int phiConvictThreshold_ = 8; + /* The Failure Detector has to have been up for at least 1 min. */ + private static final long uptimeThreshold_ = 60000; + private static IFailureDetector failureDetector_; + /* Used to lock the factory for creation of FailureDetector instance */ + private static Lock createLock_ = new ReentrantLock(); + /* The time when the module was instantiated. */ + private static long creationTime_; + + public static IFailureDetector instance() + { + if ( failureDetector_ == null ) + { + FailureDetector.createLock_.lock(); + try + { + if ( failureDetector_ == null ) + { + failureDetector_ = new FailureDetector(); + } + } + finally + { + createLock_.unlock(); + } + } + return failureDetector_; + } + + private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>(); + private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>(); + + public FailureDetector() + { + creationTime_ = System.currentTimeMillis(); + // Register this instance with JMX + try + { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(this, new ObjectName("org.apache.cassandra.gms:type=FailureDetector")); + } + catch (Exception e) + { + logger_.error(LogUtil.throwableToString(e)); + } + } + + /** + * Dump the inter arrival times for examination if necessary. + */ + public void dumpInterArrivalTimes() + { + try + { + FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true); + fos.write(toString().getBytes()); + fos.close(); + } + catch(Throwable th) + { + logger_.warn(LogUtil.throwableToString(th)); + } + } + + /** + * We dump the arrival window for any endpoint only if the + * local Failure Detector module has been up for more than a + * minute. + * + * @param ep for which the arrival window needs to be dumped. + */ + private void dumpInterArrivalTimes(EndPoint ep) + { + long now = System.currentTimeMillis(); + if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ ) + return; + try + { + FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true); + ArrivalWindow hWnd = arrivalSamples_.get(ep); + fos.write(hWnd.toString().getBytes()); + fos.close(); + } + catch(Throwable th) + { + logger_.warn(LogUtil.throwableToString(th)); + } + } + + public boolean isAlive(EndPoint ep) + { + try + { + /* If the endpoint in question is the local endpoint return true. */ + String localHost = FBUtilities.getHostAddress(); + if ( localHost.equals( ep.getHost() ) ) + return true; + } + catch( UnknownHostException ex ) + { + logger_.info( LogUtil.throwableToString(ex) ); + } + /* Incoming port is assumed to be the Storage port. We need to change it to the control port */ + EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort()); + EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2); + return epState.isAlive(); + } + + public void report(EndPoint ep) + { + if (logger_.isTraceEnabled()) + logger_.trace("reporting " + ep); + long now = System.currentTimeMillis(); + ArrivalWindow heartbeatWindow = arrivalSamples_.get(ep); + if ( heartbeatWindow == null ) + { + heartbeatWindow = new ArrivalWindow(sampleSize_); + arrivalSamples_.put(ep, heartbeatWindow); + } + heartbeatWindow.add(now); + } + + public void interpret(EndPoint ep) + { + ArrivalWindow hbWnd = arrivalSamples_.get(ep); + if ( hbWnd == null ) + { + return; + } + long now = System.currentTimeMillis(); + /* We need this so that we do not suspect a convict. */ + boolean isConvicted = false; + double phi = hbWnd.phi(now); + if (logger_.isTraceEnabled()) + logger_.trace("PHI for " + ep + " : " + phi); + + /* + if ( phi > phiConvictThreshold_ ) + { + isConvicted = true; + for ( IFailureDetectionEventListener listener : fdEvntListeners_ ) + { + listener.convict(ep); + } + } + */ + if ( !isConvicted && phi > phiSuspectThreshold_ ) + { + for ( IFailureDetectionEventListener listener : fdEvntListeners_ ) + { + listener.suspect(ep); + } + } + } + + public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) + { + fdEvntListeners_.add(listener); + } + + public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) + { + fdEvntListeners_.remove(listener); + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + Set<EndPoint> eps = arrivalSamples_.keySet(); + + sb.append("-----------------------------------------------------------------------"); + for ( EndPoint ep : eps ) + { + ArrivalWindow hWnd = arrivalSamples_.get(ep); + sb.append(ep + " : "); + sb.append(hWnd.toString()); + sb.append( System.getProperty("line.separator") ); + } + sb.append("-----------------------------------------------------------------------"); + return sb.toString(); + } + + public static void main(String[] args) throws Throwable + { + } +} + +class ArrivalWindow +{ + private static Logger logger_ = Logger.getLogger(ArrivalWindow.class); + private double tLast_ = 0L; + private BoundedStatsDeque arrivalIntervals_; + + ArrivalWindow(int size) + { + arrivalIntervals_ = new BoundedStatsDeque(size); + } + + synchronized void add(double value) + { + double interArrivalTime; + if ( tLast_ > 0L ) + { + interArrivalTime = (value - tLast_); + } + else + { + interArrivalTime = Gossiper.intervalInMillis_ / 2; + } + tLast_ = value; + arrivalIntervals_.add(interArrivalTime); + } + + synchronized double sum() + { + return arrivalIntervals_.sum(); + } + + synchronized double sumOfDeviations() + { + return arrivalIntervals_.sumOfDeviations(); + } + + synchronized double mean() + { + return arrivalIntervals_.mean(); + } + + synchronized double variance() + { + return arrivalIntervals_.variance(); + } + + double stdev() + { + return arrivalIntervals_.stdev(); + } + + void clear() + { + arrivalIntervals_.clear(); + } + + double p(double t) + { + double mean = mean(); + double exponent = (-1)*(t)/mean; + return 1 - ( 1 - Math.pow(Math.E, exponent) ); + } + + double phi(long tnow) + { + int size = arrivalIntervals_.size(); + double log = 0d; + if ( size > 0 ) + { + double t = tnow - tLast_; + double probability = p(t); + log = (-1) * Math.log10( probability ); + } + return log; + } + + public String toString() + { + return StringUtils.join(arrivalIntervals_.iterator(), " "); + } +} +
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java Thu Jul 30 15:30:21 2009 @@ -1,24 +1,24 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.gms; - -public interface FailureDetectorMBean -{ - public void dumpInterArrivalTimes(); -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +public interface FailureDetectorMBean +{ + public void dumpInterArrivalTimes(); +} Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java Thu Jul 30 15:30:21 2009 @@ -1,110 +1,110 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.gms; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.cassandra.io.ICompactSerializer; -import org.apache.cassandra.net.CompactEndPointSerializationHelper; -import org.apache.cassandra.net.EndPoint; -import org.apache.cassandra.net.*; - -/** - * Contains information about a specified list of EndPoints and the largest version - * of the state they have generated as known by the local endpoint. - * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) - */ - -public class GossipDigest implements Comparable<GossipDigest> -{ - private static ICompactSerializer<GossipDigest> serializer_; - static - { - serializer_ = new GossipDigestSerializer(); - } - - EndPoint endPoint_; - int generation_; - int maxVersion_; - - public static ICompactSerializer<GossipDigest> serializer() - { - return serializer_; - } - - GossipDigest(EndPoint endPoint, int generation, int maxVersion) - { - endPoint_ = endPoint; - generation_ = generation; - maxVersion_ = maxVersion; - } - - EndPoint getEndPoint() - { - return endPoint_; - } - - int getGeneration() - { - return generation_; - } - - int getMaxVersion() - { - return maxVersion_; - } - - public int compareTo(GossipDigest gDigest) - { - if ( generation_ != gDigest.generation_ ) - return ( generation_ - gDigest.generation_ ); - return (maxVersion_ - gDigest.maxVersion_); - } - - public String toString() - { - StringBuilder sb = new StringBuilder(); - sb.append(endPoint_); - sb.append(":"); - sb.append(generation_); - sb.append(":"); - sb.append(maxVersion_); - return sb.toString(); - } -} - -class GossipDigestSerializer implements ICompactSerializer<GossipDigest> -{ - public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException - { - CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos); - dos.writeInt(gDigest.generation_); - dos.writeInt(gDigest.maxVersion_); - } - - public GossipDigest deserialize(DataInputStream dis) throws IOException - { - EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis); - int generation = dis.readInt(); - int version = dis.readInt(); - return new GossipDigest(endPoint, generation, version); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.net.CompactEndPointSerializationHelper; +import org.apache.cassandra.net.EndPoint; +import org.apache.cassandra.net.*; + +/** + * Contains information about a specified list of EndPoints and the largest version + * of the state they have generated as known by the local endpoint. + * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) + */ + +public class GossipDigest implements Comparable<GossipDigest> +{ + private static ICompactSerializer<GossipDigest> serializer_; + static + { + serializer_ = new GossipDigestSerializer(); + } + + EndPoint endPoint_; + int generation_; + int maxVersion_; + + public static ICompactSerializer<GossipDigest> serializer() + { + return serializer_; + } + + GossipDigest(EndPoint endPoint, int generation, int maxVersion) + { + endPoint_ = endPoint; + generation_ = generation; + maxVersion_ = maxVersion; + } + + EndPoint getEndPoint() + { + return endPoint_; + } + + int getGeneration() + { + return generation_; + } + + int getMaxVersion() + { + return maxVersion_; + } + + public int compareTo(GossipDigest gDigest) + { + if ( generation_ != gDigest.generation_ ) + return ( generation_ - gDigest.generation_ ); + return (maxVersion_ - gDigest.maxVersion_); + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append(endPoint_); + sb.append(":"); + sb.append(generation_); + sb.append(":"); + sb.append(maxVersion_); + return sb.toString(); + } +} + +class GossipDigestSerializer implements ICompactSerializer<GossipDigest> +{ + public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException + { + CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos); + dos.writeInt(gDigest.generation_); + dos.writeInt(gDigest.maxVersion_); + } + + public GossipDigest deserialize(DataInputStream dis) throws IOException + { + EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis); + int generation = dis.readInt(); + int version = dis.readInt(); + return new GossipDigest(endPoint, generation, version); + } +} Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java Thu Jul 30 15:30:21 2009 @@ -1,77 +1,77 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.gms; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.*; -import org.apache.cassandra.io.ICompactSerializer; -import org.apache.cassandra.net.EndPoint; -import org.apache.cassandra.net.*; - - -/** - * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the - * last stage of the 3 way messaging of the Gossip protocol. - * - * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) - */ - -class GossipDigestAck2Message -{ - private static ICompactSerializer<GossipDigestAck2Message> serializer_; - static - { - serializer_ = new GossipDigestAck2MessageSerializer(); - } - - Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>(); - - public static ICompactSerializer<GossipDigestAck2Message> serializer() - { - return serializer_; - } - - GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap) - { - epStateMap_ = epStateMap; - } - - Map<EndPoint, EndPointState> getEndPointStateMap() - { - return epStateMap_; - } -} - -class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message> -{ - public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException - { - /* Use the EndPointState */ - EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos); - } - - public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException - { - Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis); - return new GossipDigestAck2Message(epStateMap); - } -} - +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.*; +import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.net.EndPoint; +import org.apache.cassandra.net.*; + + +/** + * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the + * last stage of the 3 way messaging of the Gossip protocol. + * + * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) + */ + +class GossipDigestAck2Message +{ + private static ICompactSerializer<GossipDigestAck2Message> serializer_; + static + { + serializer_ = new GossipDigestAck2MessageSerializer(); + } + + Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>(); + + public static ICompactSerializer<GossipDigestAck2Message> serializer() + { + return serializer_; + } + + GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap) + { + epStateMap_ = epStateMap; + } + + Map<EndPoint, EndPointState> getEndPointStateMap() + { + return epStateMap_; + } +} + +class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message> +{ + public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException + { + /* Use the EndPointState */ + EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos); + } + + public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException + { + Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis); + return new GossipDigestAck2Message(epStateMap); + } +} + Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Thu Jul 30 15:30:21 2009 @@ -1,102 +1,102 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.gms; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.*; - -import org.apache.cassandra.io.ICompactSerializer; -import org.apache.cassandra.net.EndPoint; - - - -/** - * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an - * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol. - * - * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) - */ - -class GossipDigestAckMessage -{ - private static ICompactSerializer<GossipDigestAckMessage> serializer_; - static - { - serializer_ = new GossipDigestAckMessageSerializer(); - } - - List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>(); - Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>(); - - static ICompactSerializer<GossipDigestAckMessage> serializer() - { - return serializer_; - } - - GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap) - { - gDigestList_ = gDigestList; - epStateMap_ = epStateMap; - } - - void addGossipDigest(EndPoint ep, int generation, int version) - { - gDigestList_.add( new GossipDigest(ep, generation, version) ); - } - - List<GossipDigest> getGossipDigestList() - { - return gDigestList_; - } - - Map<EndPoint, EndPointState> getEndPointStateMap() - { - return epStateMap_; - } -} - -class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage> -{ - public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException - { - /* Use the helper to serialize the GossipDigestList */ - boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos); - dos.writeBoolean(bContinue); - /* Use the EndPointState */ - if ( bContinue ) - { - EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos); - } - } - - public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException - { - Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>(); - List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis); - boolean bContinue = dis.readBoolean(); - - if ( bContinue ) - { - epStateMap = EndPointStatesSerializationHelper.deserialize(dis); - } - return new GossipDigestAckMessage(gDigestList, epStateMap); - } +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.*; + +import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.net.EndPoint; + + + +/** + * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an + * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol. + * + * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) + */ + +class GossipDigestAckMessage +{ + private static ICompactSerializer<GossipDigestAckMessage> serializer_; + static + { + serializer_ = new GossipDigestAckMessageSerializer(); + } + + List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>(); + Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>(); + + static ICompactSerializer<GossipDigestAckMessage> serializer() + { + return serializer_; + } + + GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap) + { + gDigestList_ = gDigestList; + epStateMap_ = epStateMap; + } + + void addGossipDigest(EndPoint ep, int generation, int version) + { + gDigestList_.add( new GossipDigest(ep, generation, version) ); + } + + List<GossipDigest> getGossipDigestList() + { + return gDigestList_; + } + + Map<EndPoint, EndPointState> getEndPointStateMap() + { + return epStateMap_; + } +} + +class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage> +{ + public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException + { + /* Use the helper to serialize the GossipDigestList */ + boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos); + dos.writeBoolean(bContinue); + /* Use the EndPointState */ + if ( bContinue ) + { + EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos); + } + } + + public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException + { + Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>(); + List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis); + boolean bContinue = dis.readBoolean(); + + if ( bContinue ) + { + epStateMap = EndPointStatesSerializationHelper.deserialize(dis); + } + return new GossipDigestAckMessage(gDigestList, epStateMap); + } } \ No newline at end of file Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Thu Jul 30 15:30:21 2009 @@ -1,184 +1,184 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.gms; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.*; - -import org.apache.cassandra.io.ICompactSerializer; -import org.apache.cassandra.net.CompactEndPointSerializationHelper; -import org.apache.cassandra.net.EndPoint; -import org.apache.cassandra.utils.Log4jLogger; -import org.apache.log4j.Logger; -import org.apache.cassandra.utils.*; - - -/** - * This is the first message that gets sent out as a start of the Gossip protocol in a - * round. - * - * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) - */ - -class GossipDigestSynMessage -{ - private static ICompactSerializer<GossipDigestSynMessage> serializer_; - static - { - serializer_ = new GossipDigestSynMessageSerializer(); - } - - String clusterId_; - List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>(); - - public static ICompactSerializer<GossipDigestSynMessage> serializer() - { - return serializer_; - } - - public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests) - { - clusterId_ = clusterId; - gDigests_ = gDigests; - } - - List<GossipDigest> getGossipDigests() - { - return gDigests_; - } -} - -class GossipDigestSerializationHelper -{ - private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class); - - static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException - { - boolean bVal = true; - int size = gDigestList.size(); - dos.writeInt(size); - - int estimate = 0; - for ( GossipDigest gDigest : gDigestList ) - { - if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate ) - { - logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@"); - bVal = false; - break; - } - int pre = dos.size(); - GossipDigest.serializer().serialize( gDigest, dos ); - int post = dos.size(); - estimate = post - pre; - } - return bVal; - } - - static List<GossipDigest> deserialize(DataInputStream dis) throws IOException - { - int size = dis.readInt(); - List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); - - for ( int i = 0; i < size; ++i ) - { - if ( dis.available() == 0 ) - { - logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests."); - break; - } - - GossipDigest gDigest = GossipDigest.serializer().deserialize(dis); - gDigests.add( gDigest ); - } - return gDigests; - } -} - -class EndPointStatesSerializationHelper -{ - private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName()); - - static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException - { - boolean bVal = true; - int estimate = 0; - int size = epStateMap.size(); - dos.writeInt(size); - - Set<EndPoint> eps = epStateMap.keySet(); - for( EndPoint ep : eps ) - { - if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate ) - { - logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@"); - bVal = false; - break; - } - - int pre = dos.size(); - CompactEndPointSerializationHelper.serialize(ep, dos); - EndPointState epState = epStateMap.get(ep); - EndPointState.serializer().serialize(epState, dos); - int post = dos.size(); - estimate = post - pre; - } - return bVal; - } - - static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException - { - int size = dis.readInt(); - Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>(); - - for ( int i = 0; i < size; ++i ) - { - if ( dis.available() == 0 ) - { - logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState."); - break; - } - // int length = dis.readInt(); - EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis); - EndPointState epState = EndPointState.serializer().deserialize(dis); - epStateMap.put(ep, epState); - } - return epStateMap; - } -} - -class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage> -{ - public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException - { - dos.writeUTF(gDigestSynMessage.clusterId_); - GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos); - } - - public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException - { - String clusterId = dis.readUTF(); - List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis); - return new GossipDigestSynMessage(clusterId, gDigests); - } - -} - +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.*; + +import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.net.CompactEndPointSerializationHelper; +import org.apache.cassandra.net.EndPoint; +import org.apache.cassandra.utils.Log4jLogger; +import org.apache.log4j.Logger; +import org.apache.cassandra.utils.*; + + +/** + * This is the first message that gets sent out as a start of the Gossip protocol in a + * round. + * + * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( [email protected] ) + */ + +class GossipDigestSynMessage +{ + private static ICompactSerializer<GossipDigestSynMessage> serializer_; + static + { + serializer_ = new GossipDigestSynMessageSerializer(); + } + + String clusterId_; + List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>(); + + public static ICompactSerializer<GossipDigestSynMessage> serializer() + { + return serializer_; + } + + public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests) + { + clusterId_ = clusterId; + gDigests_ = gDigests; + } + + List<GossipDigest> getGossipDigests() + { + return gDigests_; + } +} + +class GossipDigestSerializationHelper +{ + private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class); + + static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException + { + boolean bVal = true; + int size = gDigestList.size(); + dos.writeInt(size); + + int estimate = 0; + for ( GossipDigest gDigest : gDigestList ) + { + if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate ) + { + logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@"); + bVal = false; + break; + } + int pre = dos.size(); + GossipDigest.serializer().serialize( gDigest, dos ); + int post = dos.size(); + estimate = post - pre; + } + return bVal; + } + + static List<GossipDigest> deserialize(DataInputStream dis) throws IOException + { + int size = dis.readInt(); + List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); + + for ( int i = 0; i < size; ++i ) + { + if ( dis.available() == 0 ) + { + logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests."); + break; + } + + GossipDigest gDigest = GossipDigest.serializer().deserialize(dis); + gDigests.add( gDigest ); + } + return gDigests; + } +} + +class EndPointStatesSerializationHelper +{ + private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName()); + + static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException + { + boolean bVal = true; + int estimate = 0; + int size = epStateMap.size(); + dos.writeInt(size); + + Set<EndPoint> eps = epStateMap.keySet(); + for( EndPoint ep : eps ) + { + if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate ) + { + logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@"); + bVal = false; + break; + } + + int pre = dos.size(); + CompactEndPointSerializationHelper.serialize(ep, dos); + EndPointState epState = epStateMap.get(ep); + EndPointState.serializer().serialize(epState, dos); + int post = dos.size(); + estimate = post - pre; + } + return bVal; + } + + static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException + { + int size = dis.readInt(); + Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>(); + + for ( int i = 0; i < size; ++i ) + { + if ( dis.available() == 0 ) + { + logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState."); + break; + } + // int length = dis.readInt(); + EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis); + EndPointState epState = EndPointState.serializer().deserialize(dis); + epStateMap.put(ep, epState); + } + return epStateMap; + } +} + +class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage> +{ + public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException + { + dos.writeUTF(gDigestSynMessage.clusterId_); + GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos); + } + + public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException + { + String clusterId = dis.readUTF(); + List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis); + return new GossipDigestSynMessage(clusterId, gDigests); + } + +} +
