Author: jbellis
Date: Tue Apr 27 18:35:21 2010
New Revision: 938597
URL: http://svn.apache.org/viewvc?rev=938597&view=rev
Log:
Replace synchronization in Gossiper with concurrent data structures and
volatile fields. Also removes getSortedApplicationStates since nothing
actually seems to rely on iterating in sorted order. Patch by Brandon Williams
and jbellis for CASSANDRA-757
Modified:
cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=938597&r1=938596&r2=938597&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Tue
Apr 27 18:35:21 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.*;
import org.apache.cassandra.io.ICompactSerializer;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,25 +35,21 @@ import org.slf4j.LoggerFactory;
public class EndpointState
{
- private static ICompactSerializer<EndpointState> serializer_;
- static
- {
- serializer_ = new EndpointStateSerializer();
- }
-
- HeartBeatState hbState_;
- Map<String, ApplicationState> applicationState_ = new Hashtable<String,
ApplicationState>();
+ private final static ICompactSerializer<EndpointState> serializer_ = new
EndpointStateSerializer();
+
+ volatile HeartBeatState hbState_;
+ final Map<String, ApplicationState> applicationState_ = new
NonBlockingHashMap<String, ApplicationState>();
/* fields below do not get serialized */
- long updateTimestamp_;
- boolean isAlive_;
- boolean isAGossiper_;
+ volatile long updateTimestamp_;
+ volatile boolean isAlive_;
+ volatile boolean isAGossiper_;
// whether this endpoint has token associated with it or not. Initially
set false for all
// endpoints. After certain time of inactivity, gossiper will examine if
this node has a
// token or not and will set this true if token is found. If there is no
token, this is a
// fat client and will be removed automatically from gossip.
- boolean hasToken_;
+ volatile boolean hasToken_;
public static ICompactSerializer<EndpointState> serializer()
{
@@ -73,17 +70,21 @@ public class EndpointState
return hbState_;
}
- synchronized void setHeartBeatState(HeartBeatState hbState)
+ void setHeartBeatState(HeartBeatState hbState)
{
updateTimestamp();
hbState_ = hbState;
}
-
+
public ApplicationState getApplicationState(String key)
{
return applicationState_.get(key);
}
-
+
+ /**
+ * TODO replace this with operations that don't expose private state
+ */
+ @Deprecated
public Map<String, ApplicationState> getApplicationStateMap()
{
return applicationState_;
@@ -100,7 +101,7 @@ public class EndpointState
return updateTimestamp_;
}
- synchronized void updateTimestamp()
+ void updateTimestamp()
{
updateTimestamp_ = System.currentTimeMillis();
}
@@ -110,7 +111,7 @@ public class EndpointState
return isAlive_;
}
- synchronized void isAlive(boolean value)
+ void isAlive(boolean value)
{
isAlive_ = value;
}
@@ -121,13 +122,13 @@ public class EndpointState
return isAGossiper_;
}
- synchronized void isAGossiper(boolean value)
+ void isAGossiper(boolean value)
{
//isAlive_ = false;
isAGossiper_ = value;
}
- public synchronized void setHasToken(boolean value)
+ public void setHasToken(boolean value)
{
hasToken_ = value;
}
@@ -136,22 +137,6 @@ public class EndpointState
{
return hasToken_;
}
-
- public List<Map.Entry<String,ApplicationState>>
getSortedApplicationStates()
- {
- ArrayList<Map.Entry<String, ApplicationState>> entries = new
ArrayList<Map.Entry<String, ApplicationState>>();
- entries.addAll(applicationState_.entrySet());
- Collections.sort(entries, new Comparator<Map.Entry<String,
ApplicationState>>()
- {
- public int compare(Map.Entry<String, ApplicationState> lhs,
Map.Entry<String, ApplicationState> rhs)
- {
- return lhs.getValue().compareTo(rhs.getValue());
- }
- });
-
- return entries;
- }
-
}
class EndpointStateSerializer implements ICompactSerializer<EndpointState>
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=938597&r1=938596&r2=938597&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Apr 27
18:35:21 2010
@@ -21,6 +21,9 @@ package org.apache.cassandra.gms;
import java.io.*;
import java.util.*;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.net.InetAddress;
import org.apache.cassandra.concurrent.StageManager;
@@ -52,45 +55,42 @@ public class Gossiper implements IFailur
{
try
{
- synchronized( Gossiper.instance )
- {
- /* Update the local heartbeat counter. */
-
endpointStateMap_.get(localEndpoint_).getHeartBeatState().updateHeartBeat();
- List<GossipDigest> gDigests = new
ArrayList<GossipDigest>();
- Gossiper.instance.makeRandomGossipDigest(gDigests);
+ /* Update the local heartbeat counter. */
+
endpointStateMap_.get(localEndpoint_).getHeartBeatState().updateHeartBeat();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ Gossiper.instance.makeRandomGossipDigest(gDigests);
+
+ if ( gDigests.size() > 0 )
+ {
+ Message message = makeGossipDigestSynMessage(gDigests);
+ /* Gossip to some random live member */
+ boolean gossipedToSeed = doGossipToLiveMember(message);
+
+ /* Gossip to some unreachable member with some probability
to check if he is back up */
+ doGossipToUnreachableMember(message);
+
+ /* Gossip to a seed if we did not do so above, or we have
seen less nodes
+ than there are seeds. This prevents partitions where
each group of nodes
+ is only gossiping to a subset of the seeds.
+
+ The most straightforward check would be to check that
all the seeds have been
+ verified either as live or unreachable. To avoid that
computation each round,
+ we reason that:
+
+ either all the live nodes are seeds, in which case
non-seeds that come online
+ will introduce themselves to a member of the ring by
definition,
+
+ or there is at least one non-seed node in the list, in
which case eventually
+ someone will gossip to it, and then do a gossip to a
random seed from the
+ gossipedToSeed check.
+
+ See CASSANDRA-150 for more exposition. */
+ if (!gossipedToSeed || liveEndpoints_.size() <
seeds_.size())
+ doGossipToSeed(message);
- if ( gDigests.size() > 0 )
- {
- Message message = makeGossipDigestSynMessage(gDigests);
- /* Gossip to some random live member */
- boolean gossipedToSeed = doGossipToLiveMember(message);
-
- /* Gossip to some unreachable member with some
probability to check if he is back up */
- doGossipToUnreachableMember(message);
-
- /* Gossip to a seed if we did not do so above, or we
have seen less nodes
- than there are seeds. This prevents partitions
where each group of nodes
- is only gossiping to a subset of the seeds.
-
- The most straightforward check would be to check
that all the seeds have been
- verified either as live or unreachable. To avoid
that computation each round,
- we reason that:
-
- either all the live nodes are seeds, in which case
non-seeds that come online
- will introduce themselves to a member of the ring
by definition,
-
- or there is at least one non-seed node in the list,
in which case eventually
- someone will gossip to it, and then do a gossip to
a random seed from the
- gossipedToSeed check.
-
- See CASSANDRA-150 for more exposition. */
- if (!gossipedToSeed || liveEndpoints_.size() <
seeds_.size())
- doGossipToSeed(message);
-
- if (logger_.isTraceEnabled())
- logger_.trace("Performing status check ...");
- doStatusCheck();
- }
+ if (logger_.isTraceEnabled())
+ logger_.trace("Performing status check ...");
+ doStatusCheck();
}
}
catch (Exception e)
@@ -110,27 +110,34 @@ public class Gossiper implements IFailur
private long aVeryLongTime_;
private long FatClientTimeout_;
private Random random_ = new Random();
+ private Comparator<InetAddress> inetcomparator = new
Comparator<InetAddress>()
+ {
+ public int compare(InetAddress addr1, InetAddress addr2)
+ {
+ return addr1.getHostAddress().compareTo(addr2.getHostAddress());
+ }
+ };
/* subscribers for interest in EndpointState change */
- private List<IEndpointStateChangeSubscriber> subscribers_ = new
ArrayList<IEndpointStateChangeSubscriber>();
+ private List<IEndpointStateChangeSubscriber> subscribers_ = new
CopyOnWriteArrayList<IEndpointStateChangeSubscriber>();
/* live member set */
- private Set<InetAddress> liveEndpoints_ = new HashSet<InetAddress>();
+ private Set<InetAddress> liveEndpoints_ = new
ConcurrentSkipListSet<InetAddress>(inetcomparator);
/* unreachable member set */
- private Set<InetAddress> unreachableEndpoints_ = new
HashSet<InetAddress>();
+ private Set<InetAddress> unreachableEndpoints_ = new
ConcurrentSkipListSet<InetAddress>(inetcomparator);
/* initial seeds for joining the cluster */
- private Set<InetAddress> seeds_ = new HashSet<InetAddress>();
+ private Set<InetAddress> seeds_ = new
ConcurrentSkipListSet<InetAddress>(inetcomparator);
/* map where key is the endpoint and value is the state associated with
the endpoint */
- Map<InetAddress, EndpointState> endpointStateMap_ = new
Hashtable<InetAddress, EndpointState>();
+ Map<InetAddress, EndpointState> endpointStateMap_ = new
ConcurrentHashMap<InetAddress, EndpointState>();
/* map where key is endpoint and value is timestamp when this endpoint was
removed from
* gossip. We will ignore any gossip regarding these endpoints for
Streaming.RING_DELAY time
* after removal to prevent nodes from falsely reincarnating during the
time when removal
* gossip gets propagated to all nodes */
- Map<InetAddress, Long> justRemovedEndpoints_ = new Hashtable<InetAddress,
Long>();
+ Map<InetAddress, Long> justRemovedEndpoints_ = new
ConcurrentHashMap<InetAddress, Long>();
private Gossiper()
{
@@ -144,12 +151,12 @@ public class Gossiper implements IFailur
}
/** Register with the Gossiper for EndpointState notifications */
- public synchronized void register(IEndpointStateChangeSubscriber
subscriber)
+ public void register(IEndpointStateChangeSubscriber subscriber)
{
subscribers_.add(subscriber);
}
- public synchronized void unregister(IEndpointStateChangeSubscriber
subscriber)
+ public void unregister(IEndpointStateChangeSubscriber subscriber)
{
subscribers_.remove(subscriber);
}
@@ -224,8 +231,7 @@ public class Gossiper implements IFailur
}
/**
- * No locking required since it is called from a method that already
- * has acquired a lock. The gossip digest is built based on randomization
+ * The gossip digest is built based on randomization
* rather than just looping through the collection of live endpoints.
*
* @param gDigests list of Gossip Digests.
@@ -431,7 +437,7 @@ public class Gossiper implements IFailur
return endpointStateMap_.get(ep);
}
- synchronized EndpointState getStateForVersionBiggerThan(InetAddress
forEndpoint, int version)
+ EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int
version)
{
if (logger_.isTraceEnabled())
logger_.trace("Scanning for state greater than " + version + " for
" + forEndpoint);
@@ -588,7 +594,7 @@ public class Gossiper implements IFailur
subscriber.onJoin(ep, epState);
}
- synchronized void applyStateLocally(Map<InetAddress, EndpointState>
epStateMap)
+ void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
{
for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
{
@@ -657,7 +663,7 @@ public class Gossiper implements IFailur
{
Map<String, ApplicationState> localAppStateMap =
localStatePtr.getApplicationStateMap();
- for (Map.Entry<String,ApplicationState> remoteEntry :
remoteStatePtr.getSortedApplicationStates())
+ for (Map.Entry<String,ApplicationState> remoteEntry :
remoteStatePtr.getApplicationStateMap().entrySet())
{
String remoteKey = remoteEntry.getKey();
ApplicationState remoteAppState = remoteEntry.getValue();
@@ -706,7 +712,7 @@ public class Gossiper implements IFailur
}
}
- synchronized void isAlive(InetAddress addr, EndpointState epState, boolean
value)
+ void isAlive(InetAddress addr, EndpointState epState, boolean value)
{
epState.isAlive(value);
if (value)
@@ -747,7 +753,7 @@ public class Gossiper implements IFailur
This method is used to figure the state that the Gossiper has but
Gossipee doesn't. The delta digests
and the delta state are built up.
*/
- synchronized void examineGossiper(List<GossipDigest> gDigestList,
List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState>
deltaEpStateMap)
+ void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest>
deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
{
for ( GossipDigest gDigest : gDigestList )
{
@@ -837,7 +843,7 @@ public class Gossiper implements IFailur
gossipTimer_.schedule( new GossipTimerTask(),
Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
}
- public synchronized void addLocalApplicationState(String key,
ApplicationState appState)
+ public void addLocalApplicationState(String key, ApplicationState appState)
{
assert !StorageService.instance.isClientMode();
EndpointState epState = endpointStateMap_.get(localEndpoint_);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=938597&r1=938596&r2=938597&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Apr 27 18:35:21 2010
@@ -890,7 +890,7 @@ public class StorageService implements I
public void onJoin(InetAddress endpoint, EndpointState epState)
{
- for (Map.Entry<String,ApplicationState> entry :
epState.getSortedApplicationStates())
+ for (Map.Entry<String,ApplicationState> entry :
epState.getApplicationStateMap().entrySet())
{
onChange(endpoint, entry.getKey(), entry.getValue());
}