Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec9ce3df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec9ce3df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec9ce3df

Branch: refs/heads/trunk
Commit: ec9ce3dfba0030015c5dd846b8b5b526614cf5f7
Parents: 5484bd1 2836a64
Author: Joel Knighton <j...@apache.org>
Authored: Wed Mar 22 13:20:24 2017 -0500
Committer: Joel Knighton <j...@apache.org>
Committed: Wed Mar 22 13:22:43 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../gms/GossipDigestAckVerbHandler.java         |  27 +++--
 src/java/org/apache/cassandra/gms/Gossiper.java |  65 +++++++----
 .../apache/cassandra/service/MigrationTask.java |  12 +-
 .../cassandra/service/StorageService.java       |  17 ++-
 test/conf/cassandra-seeds.yaml                  |  43 +++++++
 .../apache/cassandra/gms/ShadowRoundTest.java   | 116 +++++++++++++++++++
 .../apache/cassandra/net/MatcherResponse.java   |  24 ++--
 8 files changed, 252 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ce8535d,9140c73..8386c20
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -37,143 -49,6 +37,144 @@@ Merged from 3.0
     live rows in sstabledump (CASSANDRA-13177)
   * Provide user workaround when system_schema.columns does not contain entries
     for a table that's in system_schema.tables (CASSANDRA-13180)
 +Merged from 2.2:
++ * Discard in-flight shadow round responses (CASSANDRA-12653)
 + * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
 + * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
 + * Commitlog replay may fail if last mutation is within 4 bytes of end of 
segment (CASSANDRA-13282)
 + * Fix queries updating multiple time the same list (CASSANDRA-13130)
 + * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 + * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
 + * Fix failing COPY TO STDOUT (CASSANDRA-12497)
 + * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
 + * Exceptions encountered calling getSeeds() breaks OTC thread 
(CASSANDRA-13018)
 + * Fix negative mean latency metric (CASSANDRA-12876)
 + * Use only one file pointer when creating commitlog segments 
(CASSANDRA-12539)
 +Merged from 2.1:
 + * Remove unused repositories (CASSANDRA-13278)
 + * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
 + * Use portable stderr for java error in startup (CASSANDRA-13211)
 + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
 + * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
 +
 +
 +3.10
 + * Fix secondary index queries regression (CASSANDRA-13013)
 + * Add duration type to the protocol V5 (CASSANDRA-12850)
 + * Fix duration type validation (CASSANDRA-13143)
 + * Fix flaky GcCompactionTest (CASSANDRA-12664)
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed 
(CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy 
(CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier 
customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and 
introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL 
(CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are 
overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata 
(CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster 
(CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable 
(CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes 
in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation 
(CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types 
(CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator 
(CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary 
indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages 
(CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 
12550)
 + * Fix clustering indexes in presence of static columns in SASI 
(CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished 
running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for 
memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts 
(CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
   * Dump threads when unit tests time out (CASSANDRA-13117)
   * Better error when modifying function permissions without explicit keyspace 
(CASSANDRA-12925)
   * Indexer is not correctly invoked when building indexes over sstables 
(CASSANDRA-13075)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 15662b1,59060f8..d6d9dfb
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@@ -54,16 -54,8 +54,10 @@@ public class GossipDigestAckVerbHandle
          if (Gossiper.instance.isInShadowRound())
          {
              if (logger.isDebugEnabled())
 -                logger.debug("Finishing shadow round with {}", from);
 -            Gossiper.instance.finishShadowRound(epStateMap);
 +                logger.debug("Received an ack from {}, which may trigger exit 
from shadow round", from);
++
 +            // if the ack is completely empty, then we can infer that the 
respondent is also in a shadow round
-             Gossiper.instance.maybeFinishShadowRound(from, 
gDigestList.isEmpty() && epStateMap.isEmpty());
++            Gossiper.instance.maybeFinishShadowRound(from, 
gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
              return; // don't bother doing anything else, we have what we came 
for
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index ebfd66d,802ff9c..177d7dc
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -30,9 -30,9 +30,10 @@@ import javax.management.ObjectName
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.ImmutableMap;
  import com.google.common.util.concurrent.Uninterruptibles;
  
 +import org.apache.cassandra.utils.CassandraVersion;
  import org.apache.cassandra.utils.Pair;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -112,7 -114,7 +116,8 @@@ public class Gossiper implements IFailu
      private final Map<InetAddress, Long> unreachableEndpoints = new 
ConcurrentHashMap<InetAddress, Long>();
  
      /* initial seeds for joining the cluster */
--    private final Set<InetAddress> seeds = new 
ConcurrentSkipListSet<InetAddress>(inetcomparator);
++    @VisibleForTesting
++    final Set<InetAddress> seeds = new 
ConcurrentSkipListSet<InetAddress>(inetcomparator);
  
      /* map where key is the endpoint and value is the state associated with 
the endpoint */
      final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new 
ConcurrentHashMap<InetAddress, EndpointState>();
@@@ -126,7 -128,8 +131,10 @@@
      private final Map<InetAddress, Long> expireTimeEndpointMap = new 
ConcurrentHashMap<InetAddress, Long>();
  
      private volatile boolean inShadowRound = false;
++    // seeds gathered during shadow round that indicated to be in the shadow 
round phase as well
 +    private final Set<InetAddress> seedsInShadowRound = new 
ConcurrentSkipListSet<>(inetcomparator);
+     // endpoint states as gathered during shadow round
+     private final Map<InetAddress, EndpointState> endpointShadowStateMap = 
new ConcurrentHashMap<>();
  
      private volatile long lastProcessedMessageAt = System.currentTimeMillis();
  
@@@ -715,22 -720,16 +725,24 @@@
      }
  
      /**
 -     * Check if this endpoint can safely bootstrap into the cluster.
 +     * Check if this node can safely be started and join the ring.
 +     * If the node is bootstrapping, examines gossip state for any previous 
status to decide whether
 +     * it's safe to allow this node to start and bootstrap. If not 
bootstrapping, compares the host ID
 +     * that the node itself has (obtained by reading from system.local or 
generated if not present)
 +     * with the host ID obtained from gossip for the endpoint address (if 
any). This latter case
 +     * prevents a non-bootstrapping, new node from being started with the 
same address of a
 +     * previously started, but currently down predecessor.
       *
       * @param endpoint - the endpoint to check
 +     * @param localHostUUID - the host id to check
 +     * @param isBootstrapping - whether the node intends to bootstrap when 
joining
+      * @param epStates - endpoint states in the cluster
 -     * @return true if the endpoint can join the cluster
 +     * @return true if it is safe to start the node, false otherwise
       */
-     public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, 
boolean isBootstrapping)
 -    public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, 
EndpointState> epStates)
++    public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, 
boolean isBootstrapping,
++                                    Map<InetAddress, EndpointState> epStates)
      {
-         EndpointState epState = endpointStateMap.get(endpoint);
+         EndpointState epState = epStates.get(endpoint);
 -
          // if there's no previous state, or the node was previously removed 
from the cluster, we're good
          if (epState == null || isDeadState(epState))
              return true;
@@@ -1343,20 -1327,27 +1352,32 @@@
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Used when preparing to join the ring:
-      *      * when replacing a node, to get and assume its tokens
-      *      * when joining, to check that the local host id matches any 
previous id for the endpoint address
+      * Do a single 'shadow' round of gossip by retrieving endpoint states 
that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
 -     * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any 
previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that 
shadow round must be cleared
 -     * again by calling {@link Gossiper#finishShadowRound(Map)}. This will 
update
++     * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, 
boolean, Map)}. This will update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order 
to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single 
shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
 +        // it may be that the local address is the only entry in the seed
 +        // list in which case, attempting a shadow round is pointless
 +        if (seeds.isEmpty())
-             return;
++            return endpointShadowStateMap;
 +
 +        seedsInShadowRound.clear();
+         endpointShadowStateMap.clear();
          // send a completely empty syn
          List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
          GossipDigestSyn digestSynMessage = new 
GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@@ -1401,9 -1383,11 +1422,12 @@@
          {
              throw new RuntimeException(wtf);
          }
+ 
+         return ImmutableMap.copyOf(endpointShadowStateMap);
      }
  
--    private void buildSeedsList()
++    @VisibleForTesting
++    void buildSeedsList()
      {
          for (InetAddress seed : DatabaseDescriptor.getSeeds())
          {
@@@ -1521,32 -1505,12 +1545,33 @@@
          return (scheduledGossipTask != null) && 
(!scheduledGossipTask.isCancelled());
      }
  
-     protected void maybeFinishShadowRound(InetAddress respondent, boolean 
isInShadowRound)
 -    protected void finishShadowRound(Map<InetAddress, EndpointState> 
epStateMap)
++    protected void maybeFinishShadowRound(InetAddress respondent, boolean 
isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
      {
          if (inShadowRound)
          {
 -            endpointShadowStateMap.putAll(epStateMap);
 -            inShadowRound = false;
 +            if (!isInShadowRound)
 +            {
 +                logger.debug("Received a regular ack from {}, can now exit 
shadow round", respondent);
 +                // respondent sent back a full ack, so we can exit our shadow 
round
++                endpointShadowStateMap.putAll(epStateMap);
 +                inShadowRound = false;
 +                seedsInShadowRound.clear();
 +            }
 +            else
 +            {
 +                // respondent indicates it too is in a shadow round, if all 
seeds
 +                // are in this state then we can exit our shadow round. 
Otherwise,
 +                // we keep retrying the SR until one responds with a full ACK 
or
 +                // we learn that all seeds are in SR.
 +                logger.debug("Received an ack from {} indicating it is also 
in shadow round", respondent);
 +                seedsInShadowRound.add(respondent);
 +                if (seedsInShadowRound.containsAll(seeds))
 +                {
 +                    logger.debug("All seeds are in a shadow round, clearing 
this node to exit its own");
 +                    inShadowRound = false;
 +                    seedsInShadowRound.clear();
 +                }
 +            }
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index b64cf13,6760040..3c0bc1a
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -492,76 -474,52 +492,75 @@@ public class StorageService extends Not
          daemon.deactivate();
      }
  
 -    public synchronized Collection<Token> prepareReplacementInfo() throws 
ConfigurationException
 +    private synchronized UUID prepareForReplacement() throws 
ConfigurationException
      {
 -        logger.info("Gathering node replacement information for {}", 
DatabaseDescriptor.getReplaceAddress());
 -        if (!MessagingService.instance().isListening())
 -            MessagingService.instance().listen();
 +        if (SystemKeyspace.bootstrapComplete())
 +            throw new RuntimeException("Cannot replace address with a node 
that is already bootstrapped");
 +
 +        if (!joinRing)
 +            throw new ConfigurationException("Cannot set both join_ring=false 
and attempt to replace a node");
  
 -        // make magic happen
 +        if (!DatabaseDescriptor.isAutoBootstrap() && 
!Boolean.getBoolean("cassandra.allow_unsafe_replace"))
 +            throw new RuntimeException("Replacing a node without 
bootstrapping risks invalidating consistency " +
 +                                       "guarantees as the expected data may 
not be present until repair is run. " +
 +                                       "To perform this operation, please 
restart with " +
 +                                       
"-Dcassandra.allow_unsafe_replace=true");
 +
 +        InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
 +        logger.info("Gathering node replacement information for {}", 
replaceAddress);
-         Gossiper.instance.doShadowRound();
+         Map<InetAddress, EndpointState> epStates = 
Gossiper.instance.doShadowRound();
 -        // now that we've gossiped at least once, we should be able to find 
the node we're replacing
 -        if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
 -            throw new RuntimeException("Cannot replace_address " + 
DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
 -        replacingId = 
Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
 +        // as we've completed the shadow round of gossip, we should be able 
to find the node we're replacing
-         if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == 
null)
++        if (epStates.get(replaceAddress) == null)
 +            throw new RuntimeException(String.format("Cannot replace_address 
%s because it doesn't exist in gossip", replaceAddress));
 +
          try
          {
-             VersionedValue tokensVersionedValue = 
Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS);
 -            VersionedValue tokensVersionedValue = 
epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
++            VersionedValue tokensVersionedValue = 
epStates.get(replaceAddress).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
 -                throw new RuntimeException("Could not find tokens for " + 
DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = 
TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new 
ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +                throw new RuntimeException(String.format("Could not find 
tokens for %s to replace", replaceAddress));
  
 -            if (isReplacingSameAddress())
 -            {
 -                SystemKeyspace.setLocalHostId(replacingId); // use the 
replacee's host Id as our own so we receive hints, etc
 -            }
 -            return tokens;
 +            bootstrapTokens = 
TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new 
ByteArrayInputStream(tokensVersionedValue.toBytes())));
          }
          catch (IOException e)
          {
              throw new RuntimeException(e);
          }
 +
 +        UUID localHostId = SystemKeyspace.getLocalHostId();
 +
 +        if (isReplacingSameAddress())
 +        {
-             localHostId = Gossiper.instance.getHostId(replaceAddress);
++            localHostId = Gossiper.instance.getHostId(replaceAddress, 
epStates);
 +            SystemKeyspace.setLocalHostId(localHostId); // use the replacee's 
host Id as our own so we receive hints, etc
 +        }
 +
-         Gossiper.instance.resetEndpointStateMap(); // clean up since we have 
what we need
 +        return localHostId;
      }
  
 -    public synchronized void checkForEndpointCollision() throws 
ConfigurationException
 +    private synchronized void checkForEndpointCollision(UUID localHostId) 
throws ConfigurationException
      {
 +        if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
 +        {
 +            logger.warn("Skipping endpoint collision check as 
cassandra.allow_unsafe_join=true");
 +            return;
 +        }
 +
          logger.debug("Starting shadow gossip round to check for endpoint 
collision");
-         Gossiper.instance.doShadowRound();
 -        if (!MessagingService.instance().isListening())
 -            MessagingService.instance().listen();
+         Map<InetAddress, EndpointState> epStates = 
Gossiper.instance.doShadowRound();
 -        if 
(!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), 
epStates))
 +        // If bootstrapping, check whether any previously known status for 
the endpoint makes it unsafe to do so.
 +        // If not bootstrapping, compare the host id for this endpoint 
learned from gossip (if any) with the local
 +        // one, which was either read from system.local or generated at 
startup. If a learned id is present &
 +        // doesn't match the local, then the node needs replacing
-         if 
(!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), 
localHostId, shouldBootstrap()))
++        if 
(!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), 
localHostId, shouldBootstrap(), epStates))
          {
              throw new RuntimeException(String.format("A node with address %s 
already exists, cancelling join. " +
                                                       "Use 
cassandra.replace_address if you want to replace this node.",
                                                       
FBUtilities.getBroadcastAddress()));
          }
 -        if (useStrictConsistency && !allowSimultaneousMoves())
 +
 +        if (shouldBootstrap() && useStrictConsistency && 
!allowSimultaneousMoves())
          {
-             for (Map.Entry<InetAddress, EndpointState> entry : 
Gossiper.instance.getEndpointStates())
+             for (Map.Entry<InetAddress, EndpointState> entry : 
epStates.entrySet())
              {
                  // ignore local node or empty status
                  if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) 
|| entry.getValue().getApplicationState(ApplicationState.STATUS) == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/conf/cassandra-seeds.yaml
----------------------------------------------------------------------
diff --cc test/conf/cassandra-seeds.yaml
index 0000000,0000000..02d25d2
new file mode 100644
--- /dev/null
+++ b/test/conf/cassandra-seeds.yaml
@@@ -1,0 -1,0 +1,43 @@@
++#
++# Warning!
++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing 
schemas in this file.
++#
++cluster_name: Test Cluster
++# memtable_allocation_type: heap_buffers
++memtable_allocation_type: offheap_objects
++commitlog_sync: batch
++commitlog_sync_batch_window_in_ms: 1.0
++commitlog_segment_size_in_mb: 5
++commitlog_directory: build/test/cassandra/commitlog
++cdc_raw_directory: build/test/cassandra/cdc_raw
++cdc_enabled: false
++hints_directory: build/test/cassandra/hints
++partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
++listen_address: 127.0.0.1
++storage_port: 7010
++start_native_transport: true
++native_transport_port: 9042
++column_index_size_in_kb: 4
++saved_caches_directory: build/test/cassandra/saved_caches
++data_file_directories:
++    - build/test/cassandra/data
++disk_access_mode: mmap
++seed_provider:
++    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
++      parameters:
++          - seeds: "127.0.0.10,127.0.1.10,127.0.2.10"
++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
++dynamic_snitch: true
++server_encryption_options:
++    internode_encryption: none
++    keystore: conf/.keystore
++    keystore_password: cassandra
++    truststore: conf/.truststore
++    truststore_password: cassandra
++incremental_backups: true
++concurrent_compactors: 4
++compaction_throughput_mb_per_sec: 0
++row_cache_class_name: org.apache.cassandra.cache.OHCProvider
++row_cache_size_in_mb: 16
++enable_user_defined_functions: true
++enable_scripted_user_defined_functions: true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
index 0000000,0000000..f8cc49c
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
@@@ -1,0 -1,0 +1,116 @@@
++/*
++* Licensed to the Apache Software Foundation (ASF) under one
++* or more contributor license agreements.  See the NOTICE file
++* distributed with this work for additional information
++* regarding copyright ownership.  The ASF licenses this file
++* to you under the Apache License, Version 2.0 (the
++* "License"); you may not use this file except in compliance
++* with the License.  You may obtain a copy of the License at
++*
++*    http://www.apache.org/licenses/LICENSE-2.0
++*
++* Unless required by applicable law or agreed to in writing,
++* software distributed under the License is distributed on an
++* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++* KIND, either express or implied.  See the License for the
++* specific language governing permissions and limitations
++* under the License.
++*/
++
++package org.apache.cassandra.gms;
++
++import java.util.Collections;
++import java.util.concurrent.atomic.AtomicBoolean;
++
++import org.junit.After;
++import org.junit.BeforeClass;
++import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.IEndpointSnitch;
++import org.apache.cassandra.locator.PropertyFileSnitch;
++import org.apache.cassandra.net.MessageIn;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.MockMessagingService;
++import org.apache.cassandra.net.MockMessagingSpy;
++import org.apache.cassandra.service.StorageService;
++
++import static org.apache.cassandra.net.MockMessagingService.verb;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++public class ShadowRoundTest
++{
++    private static final Logger logger = 
LoggerFactory.getLogger(ShadowRoundTest.class);
++
++    @BeforeClass
++    public static void setUp() throws ConfigurationException
++    {
++        System.setProperty("cassandra.config", "cassandra-seeds.yaml");
++
++        DatabaseDescriptor.daemonInitialization();
++        IEndpointSnitch snitch = new PropertyFileSnitch();
++        DatabaseDescriptor.setEndpointSnitch(snitch);
++        Keyspace.setInitialized();
++    }
++
++    @After
++    public void cleanup()
++    {
++        MockMessagingService.cleanup();
++    }
++
++    @Test
++    public void testDelayedResponse()
++    {
++        Gossiper.instance.buildSeedsList();
++        int noOfSeeds = Gossiper.instance.seeds.size();
++
++        final AtomicBoolean ackSend = new AtomicBoolean(false);
++        MockMessagingSpy spySyn = 
MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
++                .respondN((msgOut, to) ->
++                {
++                    // ACK once to finish shadow round, then busy-spin until 
gossiper has been enabled
++                    // and then reply with remaining ACKs from other seeds
++                    if (!ackSend.compareAndSet(false, true))
++                    {
++                        while (!Gossiper.instance.isEnabled()) ;
++                    }
++
++                    HeartBeatState hb = new HeartBeatState(123, 456);
++                    EndpointState state = new EndpointState(hb);
++                    GossipDigestAck payload = new GossipDigestAck(
++                            Collections.singletonList(new GossipDigest(to, 
hb.getGeneration(), hb.getHeartBeatVersion())),
++                            Collections.singletonMap(to, state));
++
++                    logger.debug("Simulating digest ACK reply");
++                    return MessageIn.create(to, payload, 
Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, 
MessagingService.current_version);
++                }, noOfSeeds);
++
++        // GossipDigestAckVerbHandler will send ack2 for each ack received 
(after the shadow round)
++        MockMessagingSpy spyAck2 = 
MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_ACK2)).dontReply();
++
++        // Migration request messages should not be emitted during shadow 
round
++        MockMessagingSpy spyMigrationReq = 
MockMessagingService.when(verb(MessagingService.Verb.MIGRATION_REQUEST)).dontReply();
++
++        try
++        {
++            StorageService.instance.initServer();
++        }
++        catch (Exception e)
++        {
++            assertEquals("Unable to contact any seeds!", e.getMessage());
++        }
++
++        // we expect one SYN for each seed during shadow round + additional 
SYNs after gossiper has been enabled
++        assertTrue(spySyn.messagesIntercepted > noOfSeeds);
++
++        // we don't expect to emit any GOSSIP_DIGEST_ACK2 or 
MIGRATION_REQUEST messages
++        assertEquals(0, spyAck2.messagesIntercepted);
++        assertEquals(0, spyMigrationReq.messagesIntercepted);
++    }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MatcherResponse.java
index 21a75c9,0000000..6cd8085
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@@ -1,208 -1,0 +1,214 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.net.InetAddress;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Queue;
 +import java.util.Set;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.BiFunction;
 +import java.util.function.Function;
 +
 +/**
 + * Sends a response for an incoming message with a matching {@link Matcher}.
 + * The actual behavior by any instance of this class can be inspected by
 + * interacting with the returned {@link MockMessagingSpy}.
 + */
 +public class MatcherResponse
 +{
 +    private final Matcher<?> matcher;
 +    private final Set<Integer> sendResponses = new HashSet<>();
 +    private final MockMessagingSpy spy = new MockMessagingSpy();
 +    private final AtomicInteger limitCounter = new 
AtomicInteger(Integer.MAX_VALUE);
 +    private IMessageSink sink;
 +
 +    MatcherResponse(Matcher<?> matcher)
 +    {
 +        this.matcher = matcher;
 +    }
 +
 +    /**
 +     * Do not create any responses for intercepted outbound messages.
 +     */
 +    public MockMessagingSpy dontReply()
 +    {
 +        return respond((MessageIn<?>)null);
 +    }
 +
 +    /**
 +     * Respond with provided message in reply to each intercepted outbound 
message.
 +     * @param message   the message to use as mock reply from the cluster
 +     */
 +    public MockMessagingSpy respond(MessageIn<?> message)
 +    {
 +        return respondN(message, Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with the provided message in reply 
to each intercepted outbound message.
 +     * @param response  the message to use as mock reply from the cluster
 +     * @param limit     number of times to respond with message
 +     */
 +    public MockMessagingSpy respondN(final MessageIn<?> response, int limit)
 +    {
 +        return respondN((in, to) -> response, limit);
 +    }
 +
 +    /**
 +     * Respond with the message created by the provided function that will be 
called with each intercepted outbound message.
 +     * @param fnResponse    function to call for creating reply based on 
intercepted message and target address
 +     */
 +    public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, 
InetAddress, MessageIn<S>> fnResponse)
 +    {
 +        return respondN(fnResponse, Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond with message wrapping the payload object created by provided 
function called for each intercepted outbound message.
 +     * The target address from the intercepted message will automatically be 
used as the created message's sender address.
 +     * @param fnResponse    function to call for creating payload object 
based on intercepted message and target address
 +     * @param verb          verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy 
respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, 
MessagingService.Verb verb)
 +    {
 +        return respondNWithPayloadForEachReceiver(fnResponse, verb, 
Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with message wrapping the payload 
object created by provided function called for
 +     * each intercepted outbound message. The target address from the 
intercepted message will automatically be used as the
 +     * created message's sender address.
 +     * @param fnResponse    function to call for creating payload object 
based on intercepted message and target address
 +     * @param verb          verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy 
respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, 
MessagingService.Verb verb, int limit)
 +    {
 +        return respondN((MessageOut<T> msg, InetAddress to) -> {
 +                    S payload = fnResponse.apply(msg);
 +                    if (payload == null)
 +                        return null;
 +                    else
 +                        return MessageIn.create(to, payload, 
Collections.emptyMap(), verb, MessagingService.current_version);
 +                },
 +                limit);
 +    }
 +
 +    /**
 +     * Responds to each intercepted outbound message by creating a response 
message wrapping the next element consumed
 +     * from the provided queue. No reply will be send when the queue has been 
exhausted.
 +     * @param cannedResponses   prepared payload messages to use for responses
 +     * @param verb              verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> 
cannedResponses, MessagingService.Verb verb)
 +    {
 +        return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> 
cannedResponses.poll(), verb);
 +    }
 +
 +    /**
 +     * Responds to each intercepted outbound message by creating a response 
message wrapping the next element consumed
 +     * from the provided queue. This method will block until queue elements 
are available.
 +     * @param cannedResponses   prepared payload messages to use for responses
 +     * @param verb              verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy 
respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, 
MessagingService.Verb verb)
 +    {
 +        return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> {
 +            try
 +            {
 +                return cannedResponses.take();
 +            }
 +            catch (InterruptedException e)
 +            {
 +                return null;
 +            }
 +        }, verb);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with the message created by the 
provided function that will be called with
 +     * each intercepted outbound message.
 +     * @param fnResponse    function to call for creating reply based on 
intercepted message and target address
 +     */
 +    public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, 
InetAddress, MessageIn<S>> fnResponse, int limit)
 +    {
 +        limitCounter.set(limit);
 +
 +        assert sink == null: "destroy() must be called first to register new 
response";
 +
 +        sink = new IMessageSink()
 +        {
 +            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
 +            {
 +                // prevent outgoing message from being send in case matcher 
indicates a match
 +                // and instead send the mocked response
 +                if (matcher.matches(message, to))
 +                {
 +                    spy.matchingMessage(message);
 +
 +                    if (limitCounter.decrementAndGet() < 0)
 +                        return false;
 +
 +                    synchronized (sendResponses)
 +                    {
 +                        // I'm not sure about retry semantics regarding 
message/ID relationships, but I assume
 +                        // sending a message multiple times using the same ID 
shouldn't happen..
 +                        assert !sendResponses.contains(id) : "ID re-use for 
outgoing message";
 +                        sendResponses.add(id);
 +                    }
-                     MessageIn<?> response = fnResponse.apply(message, to);
-                     if (response != null)
++
++                    // create response asynchronously to match 
request/response communication execution behavior
++                    new Thread(() ->
 +                    {
-                         CallbackInfo cb = 
MessagingService.instance().getRegisteredCallback(id);
-                         if (cb != null)
-                             cb.callback.response(response);
-                         else
-                             MessagingService.instance().receive(response, id);
-                         spy.matchingResponse(response);
-                     }
++                        MessageIn<?> response = fnResponse.apply(message, to);
++                        if (response != null)
++                        {
++                            CallbackInfo cb = 
MessagingService.instance().getRegisteredCallback(id);
++                            if (cb != null)
++                                cb.callback.response(response);
++                            else
++                                MessagingService.instance().receive(response, 
id);
++                            spy.matchingResponse(response);
++                        }
++                    }).start();
++
 +                    return false;
 +                }
 +                return true;
 +            }
 +
 +            public boolean allowIncomingMessage(MessageIn message, int id)
 +            {
 +                return true;
 +            }
 +        };
 +        MessagingService.instance().addMessageSink(sink);
 +
 +        return spy;
 +    }
 +
 +    /**
 +     * Stops currently registered response from being send.
 +     */
 +    public void destroy()
 +    {
 +        MessagingService.instance().removeMessageSink(sink);
 +    }
 +}

Reply via email to