More fixes to the TokenAllocation patch by Dikang Gu; reviewed by Branimir Lambov for CASSANSRA-12990
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b6e83fc2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b6e83fc2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b6e83fc2 Branch: refs/heads/trunk Commit: b6e83fc200fa9e4c0e4f26491597188305cddd21 Parents: 67cda76 Author: Dikang Gu <[email protected]> Authored: Sat Dec 3 17:59:01 2016 -0800 Committer: Aleksey Yeschenko <[email protected]> Committed: Sun Feb 5 17:00:23 2017 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dht/tokenallocator/TokenAllocation.java | 30 ++++++++++++ .../tokenallocator/TokenAllocatorFactory.java | 2 +- src/java/org/apache/cassandra/gms/Gossiper.java | 45 ++++++++++++++++++ .../cassandra/service/CassandraDaemon.java | 48 +------------------- 5 files changed, 78 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1851e62..65efebc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * nodetool stopdaemon errors out (CASSANDRA-13030) * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954) * Fix primary index calculation for SASI (CASSANDRA-12910) + * More fixes to the TokenAllocator (CASSANDRA-12990) * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) Merged from 3.0: * Fix handling of partition with partition-level deletion plus http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java index 36824a1..15d7868 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java @@ -35,12 +35,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.locator.TokenMetadata.Topology; +import org.apache.cassandra.utils.FBUtilities; public class TokenAllocation { @@ -51,6 +53,9 @@ public class TokenAllocation final InetAddress endpoint, int numTokens) { + if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) + Gossiper.waitToSettle(); + TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap(); StrategyAdapter strategy = getStrategy(tokenMetadataCopy, rs, endpoint); Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens); @@ -198,6 +203,31 @@ public class TokenAllocation final String dc = snitch.getDatacenter(endpoint); final int replicas = rs.getReplicationFactor(dc); + if (replicas == 0 || replicas == 1) + { + // No replication, each node is treated as separate. + return new StrategyAdapter() + { + @Override + public int replicas() + { + return 1; + } + + @Override + public Object getGroup(InetAddress unit) + { + return unit; + } + + @Override + public boolean inAllocationRing(InetAddress other) + { + return dc.equals(snitch.getDatacenter(other)); + } + }; + } + Topology topology = tokenMetadata.getTopology(); int racks = topology.getDatacenterRacks().get(dc).asMap().size(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java index d20de8f..58acb56 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java @@ -34,7 +34,7 @@ public class TokenAllocatorFactory ReplicationStrategy<InetAddress> strategy, IPartitioner partitioner) { - if(strategy.replicas() == 1 || strategy.replicas() == 0) + if(strategy.replicas() == 1) { logger.info("Using NoReplicationTokenAllocator."); return new NoReplicationTokenAllocator<>(sortedTokens, strategy, partitioner); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 7f0f85b..ebfd66d 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -1614,4 +1614,49 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return null; } + public static void waitToSettle() + { + int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1); + if (forceAfter == 0) + { + return; + } + final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000; + final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000; + final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3; + + logger.info("Waiting for gossip to settle..."); + Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS); + int totalPolls = 0; + int numOkay = 0; + int epSize = Gossiper.instance.getEndpointStates().size(); + while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) + { + Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + int currentSize = Gossiper.instance.getEndpointStates().size(); + totalPolls++; + if (currentSize == epSize) + { + logger.debug("Gossip looks settled."); + numOkay++; + } + else + { + logger.info("Gossip not settled after {} polls.", totalPolls); + numOkay = 0; + } + epSize = currentSize; + if (forceAfter > 0 && totalPolls > forceAfter) + { + logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}", + totalPolls); + break; + } + } + if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) + logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED); + else + logger.info("No gossip backlog; proceeding"); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 5a97dfe..851330b 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -42,7 +42,6 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -412,7 +411,7 @@ public class CassandraDaemon ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS); if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) - waitForGossipToSettle(); + Gossiper.waitToSettle(); // schedule periodic background compaction task submission. this is simply a backstop against compactions stalling // due to scheduling errors or race conditions @@ -680,51 +679,6 @@ public class CassandraDaemon } } - private void waitForGossipToSettle() - { - int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1); - if (forceAfter == 0) - { - return; - } - final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000; - final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000; - final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3; - - logger.info("Waiting for gossip to settle before accepting client requests..."); - Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS); - int totalPolls = 0; - int numOkay = 0; - int epSize = Gossiper.instance.getEndpointStates().size(); - while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) - { - Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); - int currentSize = Gossiper.instance.getEndpointStates().size(); - totalPolls++; - if (currentSize == epSize) - { - logger.debug("Gossip looks settled."); - numOkay++; - } - else - { - logger.info("Gossip not settled after {} polls.", totalPolls); - numOkay = 0; - } - epSize = currentSize; - if (forceAfter > 0 && totalPolls > forceAfter) - { - logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}", - totalPolls); - break; - } - } - if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) - logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED); - else - logger.info("No gossip backlog; proceeding"); - } - public static void stop(String[] args) { instance.deactivate();
