Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2836a644
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2836a644
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2836a644

Branch: refs/heads/trunk
Commit: 2836a644a357c0992ba89622f04668422ce2761a
Parents: f4ba908 bf0906b
Author: Joel Knighton <j...@apache.org>
Authored: Wed Mar 22 13:13:44 2017 -0500
Committer: Joel Knighton <j...@apache.org>
Committed: Wed Mar 22 13:18:59 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 ++++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 56 ++++++++++++++------
 .../apache/cassandra/service/MigrationTask.java | 12 ++---
 .../cassandra/service/StorageService.java       | 17 +++---
 5 files changed, 73 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6021315,df2421d..9140c73
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,9 +1,28 @@@
 -2.2.10
 +3.0.13
 + * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
 + * Applying: Use a unique metric reservoir per test run when using 
Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
 + * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
 + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 +Merged from 2.2:
+  * Discard in-flight shadow round responses (CASSANDRA-12653)
   * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
   * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
 + * Commitlog replay may fail if last mutation is within 4 bytes of end of 
segment (CASSANDRA-13282)
   * Fix queries updating multiple time the same list (CASSANDRA-13130)
   * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 +
 +
 +3.0.12
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to 
LogRecord absolute path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for 
collections (CASSANDRA-13071)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones 
(CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2:
   * Avoid race on receiver by starting streaming sender thread after sending 
init message (CASSANDRA-12886)
   * Fix "multiple versions of ant detected..." when running ant test 
(CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index cbfa750,c2eccba..802ff9c
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -124,6 -128,9 +128,8 @@@ public class Gossiper implements IFailu
      private final Map<InetAddress, Long> expireTimeEndpointMap = new 
ConcurrentHashMap<InetAddress, Long>();
  
      private volatile boolean inShadowRound = false;
 -
+     // endpoint states as gathered during shadow round
+     private final Map<InetAddress, EndpointState> endpointShadowStateMap = 
new ConcurrentHashMap<>();
  
      private volatile long lastProcessedMessageAt = System.currentTimeMillis();
  
@@@ -818,28 -826,6 +827,20 @@@
          return endpointStateMap.get(ep);
      }
  
 +    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, 
ApplicationState as)
 +    {
 +        EndpointState state1 = getEndpointStateForEndpoint(ep1);
 +        EndpointState state2 = getEndpointStateForEndpoint(ep2);
 +
 +        if (state1 == null || state2 == null)
 +            return false;
 +
 +        VersionedValue value1 = state1.getApplicationState(as);
 +        VersionedValue value2 = state2.getApplicationState(as);
 +
 +        return !(value1 == null || value2 == null) && 
value1.value.equals(value2.value);
 +    }
 +
-     // removes ALL endpoint states; should only be called after shadow gossip
-     public void resetEndpointStateMap()
-     {
-         endpointStateMap.clear();
-         unreachableEndpoints.clear();
-         liveEndpoints.clear();
-     }
- 
      public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
      {
          return endpointStateMap.entrySet();
@@@ -1321,12 -1312,32 +1327,27 @@@
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Only used when replacing a node, to get and assume its states
+      * Do a single 'shadow' round of gossip by retrieving endpoint states 
that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
+      * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any 
previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that 
shadow round must be cleared
+      * again by calling {@link Gossiper#finishShadowRound(Map)}. This will 
update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order 
to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single 
shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
 -        // it may be that the local address is the only entry in the seed
 -        // list in which case, attempting a shadow round is pointless
 -        if (seeds.isEmpty())
 -            return endpointShadowStateMap;
 -
+         endpointShadowStateMap.clear();
          // send a completely empty syn
          List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
          GossipDigestSyn digestSynMessage = new 
GossipDigestSyn(DatabaseDescriptor.getClusterName(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index 39a5a11,b065d90..6b04756
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -56,13 -46,14 +56,19 @@@ class MigrationTask extends WrappedRunn
          this.endpoint = endpoint;
      }
  
 +    public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
 +    {
 +        return inflightTasks;
 +    }
 +
      public void runMayThrow() throws Exception
      {
+         if (!FailureDetector.instance.isAlive(endpoint))
+         {
+             logger.warn("Can't send schema pull request: node {} is down.", 
endpoint);
+             return;
+         }
+ 
          // There is a chance that quite some time could have passed between 
now and the MM#maybeScheduleSchemaPull(),
          // potentially enough for the endpoint node to restart - which is an 
issue if it does restart upgraded, with
          // a higher major.
@@@ -72,16 -63,8 +78,10 @@@
              return;
          }
  
-         if (!FailureDetector.instance.isAlive(endpoint))
-         {
-             logger.debug("Can't send schema pull request: node {} is down.", 
endpoint);
-             return;
-         }
- 
          MessageOut message = new 
MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, 
MigrationManager.MigrationsSerializer.instance);
  
 +        final CountDownLatch completionLatch = new CountDownLatch(1);
 +
          IAsyncCallback<Collection<Mutation>> cb = new 
IAsyncCallback<Collection<Mutation>>()
          {
              @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 35b2423,65f536b..6760040
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -481,18 -443,18 +481,17 @@@ public class StorageService extends Not
              MessagingService.instance().listen();
  
          // make magic happen
-         Gossiper.instance.doShadowRound();
- 
+         Map<InetAddress, EndpointState> epStates = 
Gossiper.instance.doShadowRound();
 -
          // now that we've gossiped at least once, we should be able to find 
the node we're replacing
-         if 
(Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())==
 null)
+         if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
              throw new RuntimeException("Cannot replace_address " + 
DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-         replacingId = 
Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+         replacingId = 
Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
          try
          {
-             VersionedValue tokensVersionedValue = 
Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+             VersionedValue tokensVersionedValue = 
epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
                  throw new RuntimeException("Could not find tokens for " + 
DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = 
TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new 
ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +            Collection<Token> tokens = 
TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new 
ByteArrayInputStream(tokensVersionedValue.toBytes())));
  
              if (isReplacingSameAddress())
              {

Reply via email to