Fix rare race condition in getExpireTimeForEndpoint; document the behavior of a few other apparently-raceful Map calls patch by Yu Lin and jbellis; reviewed by slebresne for CASSANDRA-4402
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5e3ae67 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5e3ae67 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5e3ae67 Branch: refs/heads/cassandra-1.2 Commit: f5e3ae67c4a4fef6a541a8a06a9871810639a7ec Parents: f09a89f Author: Jonathan Ellis <[email protected]> Authored: Fri Nov 9 15:53:52 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Fri Nov 9 16:04:45 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Table.java | 27 ++++++---- src/java/org/apache/cassandra/gms/Gossiper.java | 50 ++++++----------- 3 files changed, 36 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3b80885..f6769fe 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-rc1 + * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402) * acquire references to overlapping sstables during compaction so bloom filter doesn't get free'd prematurely (CASSANDRA-4934) * Don't share slice query filter in CQL3 SelectStatement (CASSANDRA-4928) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java index 85611de..18b7e4b 100644 --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -71,7 +72,7 @@ public class Table /* Table name. */ public final String name; /* ColumnFamilyStore per column family */ - private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>(); + private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>(); private final Object[] indexLocks; private volatile AbstractReplicationStrategy replicationStrategy; @@ -319,19 +320,25 @@ public class Table */ public void initCf(UUID cfId, String cfName, boolean loadSSTables) { - if (columnFamilyStores.containsKey(cfId)) - { - // this is the case when you reset local schema - // just reload metadata - ColumnFamilyStore cfs = columnFamilyStores.get(cfId); - assert cfs.getColumnFamilyName().equals(cfName); + ColumnFamilyStore cfs = columnFamilyStores.get(cfId); - cfs.metadata.reload(); - cfs.reload(); + if (cfs == null) + { + // CFS being created for the first time, either on server startup or new CF being added. + // We don't worry about races here; startup is safe, and adding multiple idential CFs + // simultaneously is a "don't do that" scenario. + ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables)); + // CFS mbean instantiation will error out before we hit this, but in case that changes... + if (oldCfs != null) + throw new IllegalStateException("added multiple mappings for cf id " + cfId); } else { - columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables)); + // re-initializing an existing CF. This will happen if you cleared the schema + // on this node and it's getting repopulated from the rest of the cluster. + assert cfs.getColumnFamilyName().equals(cfName); + cfs.metadata.reload(); + cfs.reload(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index e49a6b3..5880210 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -26,6 +26,7 @@ import java.util.concurrent.*; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +91,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator); /* map where key is the endpoint and value is the state associated with the endpoint */ - final Map<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>(); + final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>(); /* map where key is endpoint and value is timestamp when this endpoint was removed from * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time @@ -601,12 +602,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean protected long getExpireTimeForEndpoint(InetAddress endpoint) { /* default expireTime is aVeryLongTime */ - long expireTime = computeExpireTime(); - if (expireTimeEndpointMap.containsKey(endpoint)) - { - expireTime = expireTimeEndpointMap.get(endpoint); - } - return expireTime; + Long storedTime = expireTimeEndpointMap.get(endpoint); + return storedTime == null ? computeExpireTime() : storedTime; } public EndpointState getEndpointStateForEndpoint(InetAddress ep) @@ -1031,17 +1028,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean TimeUnit.MILLISECONDS); } - // initialize local HB state if needed. + // initialize local HB state if needed, i.e., if gossiper has never been started before. public void maybeInitializeLocalState(int generationNbr) { - EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); - if ( localState == null ) - { - HeartBeatState hbState = new HeartBeatState(generationNbr); - localState = new EndpointState(hbState); - localState.markAlive(); - endpointStateMap.put(FBUtilities.getBroadcastAddress(), localState); - } + HeartBeatState hbState = new HeartBeatState(generationNbr); + EndpointState localState = new EndpointState(hbState); + localState.markAlive(); + endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState); } @@ -1093,27 +1086,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } - /** - * This should *only* be used for testing purposes. - */ - public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) { - /* initialize the heartbeat state for this localEndpoint */ - EndpointState localState = endpointStateMap.get(addr); - if ( localState == null ) - { - HeartBeatState hbState = new HeartBeatState(generationNbr); - localState = new EndpointState(hbState); - localState.markAlive(); - endpointStateMap.put(addr, localState); - } + @VisibleForTesting + public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) + { + HeartBeatState hbState = new HeartBeatState(generationNbr); + EndpointState localState = new EndpointState(hbState); + localState.markAlive(); + endpointStateMap.putIfAbsent(addr, localState); + // always add the version state localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion()); localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid)); } - /** - * This should *only* be used for testing purposes - */ + @VisibleForTesting public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value) { EndpointState localState = endpointStateMap.get(endpoint);
