This is an automated email from the ASF dual-hosted git repository.

jzhuang pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 71cb0616b7710366a8cd364348c864d656dc5542
Merge: 670dde9 3f70e7c
Author: Jay Zhuang <[email protected]>
AuthorDate: Thu Jul 18 21:23:17 2019 -0700

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  21 +++-
 .../org/apache/cassandra/gms/GossiperTest.java     | 123 ++++++++++++++++++---
 3 files changed, 124 insertions(+), 21 deletions(-)

diff --cc CHANGES.txt
index f055068,f04b489..0233c0f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 -3.0.19
 +3.11.5
 + * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config 
(CASSANDRA-14305)
 + * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
 + * Add flag to disable SASI indexes, and warnings on creation 
(CASSANDRA-14866)
 +Merged from 3.0:
+  * Avoid updating unchanged gossip states (CASSANDRA-15097)
   * Prevent recreation of previously dropped columns with a different kind 
(CASSANDRA-14948)
   * Prevent client requests from blocking on executor task queue 
(CASSANDRA-15013)
   * Toughen up column drop/recreate type validations (CASSANDRA-15204)
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 5d2e997,c39f45a..6a862e5
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -24,7 -23,7 +24,8 @@@ import java.util.*
  import java.util.Map.Entry;
  import java.util.concurrent.*;
  import java.util.concurrent.locks.ReentrantLock;
 +import javax.annotation.Nullable;
+ import java.util.stream.Collectors;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Throwables;
@@@ -1288,12 -1253,26 +1289,26 @@@ public class Gossiper implements IFailu
  
          Set<Entry<ApplicationState, VersionedValue>> remoteStates = 
remoteState.states();
          assert remoteState.getHeartBeatState().getGeneration() == 
localState.getHeartBeatState().getGeneration();
-         localState.addApplicationStates(remoteStates);
  
-         for (Entry<ApplicationState, VersionedValue> remoteEntry : 
remoteStates)
-             doOnChangeNotifications(addr, remoteEntry.getKey(), 
remoteEntry.getValue());
+         // filter out the states that are already up to date (has the same or 
higher version)
+         Set<Entry<ApplicationState, VersionedValue>> updatedStates = 
remoteStates.stream().filter(entry -> {
+             VersionedValue local = 
localState.getApplicationState(entry.getKey());
+             return (local == null || local.version < 
entry.getValue().version);
+             }).collect(Collectors.toSet());
+ 
+         if (logger.isTraceEnabled() && updatedStates.size() > 0)
+         {
+             for (Entry<ApplicationState, VersionedValue> entry : 
updatedStates)
+             {
+                 logger.trace("Updating {} state version to {} for {}", 
entry.getKey().toString(), entry.getValue().version, addr);
+             }
+         }
+         localState.addApplicationStates(updatedStates);
+ 
+         for (Entry<ApplicationState, VersionedValue> updatedEntry : 
updatedStates)
+             doOnChangeNotifications(addr, updatedEntry.getKey(), 
updatedEntry.getValue());
      }
 -    
 +
      // notify that a local application state is going to change (doesn't get 
triggered for remote changes)
      private void doBeforeChangeNotifications(InetAddress addr, EndpointState 
epState, ApplicationState apState, VersionedValue newValue)
      {
diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java
index 448620a,42e4483..b6b3ffb
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@@ -70,88 -58,124 +71,174 @@@ public class GossiperTes
      public void setup()
      {
          tmd.clearUnsafe();
 -    };
 +    }
  
      @Test
 -    public void testLargeGenerationJump() throws UnknownHostException, 
InterruptedException
 +    public void testLargeGenerationJump() throws UnknownHostException
      {
          Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 2);
-         InetAddress remoteHostAddress = hosts.get(1);
+         try
+         {
+             InetAddress remoteHostAddress = hosts.get(1);
  
-         EndpointState initialRemoteState = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
-         HeartBeatState initialRemoteHeartBeat = 
initialRemoteState.getHeartBeatState();
+             EndpointState initialRemoteState = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+             HeartBeatState initialRemoteHeartBeat = 
initialRemoteState.getHeartBeatState();
  
-         //Util.createInitialRing should have initialized remoteHost's 
HeartBeatState's generation to 1
-         assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+             //Util.createInitialRing should have initialized remoteHost's 
HeartBeatState's generation to 1
+             assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
  
-         HeartBeatState proposedRemoteHeartBeat = new 
HeartBeatState(initialRemoteHeartBeat.getGeneration() + 
Gossiper.MAX_GENERATION_DIFFERENCE + 1);
-         EndpointState proposedRemoteState = new 
EndpointState(proposedRemoteHeartBeat);
+             HeartBeatState proposedRemoteHeartBeat = new 
HeartBeatState(initialRemoteHeartBeat.getGeneration() + 
Gossiper.MAX_GENERATION_DIFFERENCE + 1);
+             EndpointState proposedRemoteState = new 
EndpointState(proposedRemoteHeartBeat);
  
-         
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
proposedRemoteState));
+             
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
proposedRemoteState));
  
-         //The generation should have been updated because it isn't over 
Gossiper.MAX_GENERATION_DIFFERENCE in the future
-         HeartBeatState actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
-         assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+             //The generation should have been updated because it isn't over 
Gossiper.MAX_GENERATION_DIFFERENCE in the future
+             HeartBeatState actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
  
-         //Propose a generation 10 years in the future - this should be 
rejected.
-         HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) 
(System.currentTimeMillis()/1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
-         EndpointState badProposedRemoteState = new 
EndpointState(badProposedRemoteHeartBeat);
+             //Propose a generation 10 years in the future - this should be 
rejected.
+             HeartBeatState badProposedRemoteHeartBeat = new 
HeartBeatState((int) (System.currentTimeMillis() / 1000) + 
Gossiper.MAX_GENERATION_DIFFERENCE * 10);
+             EndpointState badProposedRemoteState = new 
EndpointState(badProposedRemoteHeartBeat);
  
-         
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
badProposedRemoteState));
+             
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
badProposedRemoteState));
  
-         actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
  
-         //The generation should not have been updated because it is over 
Gossiper.MAX_GENERATION_DIFFERENCE in the future
-         assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+             //The generation should not have been updated because it is over 
Gossiper.MAX_GENERATION_DIFFERENCE in the future
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+         }
+         finally
+         {
+             // clean up the gossip states
+             Gossiper.instance.endpointStateMap.clear();
+         }
+     }
+ 
+     int stateChangedNum = 0;
+ 
+     @Test
+     public void testDuplicatedStateUpdate() throws Exception
+     {
+         VersionedValue.VersionedValueFactory valueFactory =
+             new 
VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+ 
+         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 2);
+         try
+         {
+             InetAddress remoteHostAddress = hosts.get(1);
+ 
+             EndpointState initialRemoteState = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+             HeartBeatState initialRemoteHeartBeat = 
initialRemoteState.getHeartBeatState();
+ 
+             //Util.createInitialRing should have initialized remoteHost's 
HeartBeatState's generation to 1
+             assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+ 
+             HeartBeatState proposedRemoteHeartBeat = new 
HeartBeatState(initialRemoteHeartBeat.getGeneration());
+             EndpointState proposedRemoteState = new 
EndpointState(proposedRemoteHeartBeat);
+ 
+             final Token token = 
DatabaseDescriptor.getPartitioner().getRandomToken();
+             VersionedValue tokensValue = 
valueFactory.tokens(Collections.singletonList(token));
+             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, 
tokensValue);
+ 
+             Gossiper.instance.register(
+             new IEndpointStateChangeSubscriber()
+             {
+                 public void onJoin(InetAddress endpoint, EndpointState 
epState) { }
+ 
+                 public void beforeChange(InetAddress endpoint, EndpointState 
currentState, ApplicationState newStateKey, VersionedValue newValue) { }
+ 
+                 public void onChange(InetAddress endpoint, ApplicationState 
state, VersionedValue value)
+                 {
+                     assertEquals(ApplicationState.TOKENS, state);
+                     stateChangedNum++;
+                 }
+ 
+                 public void onAlive(InetAddress endpoint, EndpointState 
state) { }
+ 
+                 public void onDead(InetAddress endpoint, EndpointState state) 
{ }
+ 
+                 public void onRemove(InetAddress endpoint) { }
+ 
+                 public void onRestart(InetAddress endpoint, EndpointState 
state) { }
+             }
+             );
+ 
+             stateChangedNum = 0;
+             
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
proposedRemoteState));
+             assertEquals(1, stateChangedNum);
+ 
+             HeartBeatState actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+ 
+             // Clone a new HeartBeatState
+             proposedRemoteHeartBeat = new 
HeartBeatState(initialRemoteHeartBeat.getGeneration(), 
proposedRemoteHeartBeat.getHeartBeatVersion());
+             proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ 
+             // Bump the heartbeat version and use the same TOKENS state
+             proposedRemoteHeartBeat.updateHeartBeat();
+             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, 
tokensValue);
+ 
+             // The following state change should only update heartbeat 
without updating the TOKENS state
+             
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
proposedRemoteState));
+             assertEquals(1, stateChangedNum);
+ 
+             actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+         }
+         finally
+         {
+             // clean up the gossip states
+             Gossiper.instance.endpointStateMap.clear();
+         }
      }
 +
 +    @Test
 +    public void testSchemaVersionUpdate() throws UnknownHostException, 
InterruptedException
 +    {
 +        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 2);
 +        MessagingService.instance().listen();
 +        Gossiper.instance.start(1);
 +        InetAddress remoteHostAddress = hosts.get(1);
 +
 +        EndpointState initialRemoteState = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
 +        // Set to any 3.0 version
 +        Gossiper.instance.injectApplicationState(remoteHostAddress, 
ApplicationState.RELEASE_VERSION, 
StorageService.instance.valueFactory.releaseVersion("3.0.14"));
 +
 +        
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
initialRemoteState));
 +
 +        // wait until the schema is set
 +        VersionedValue schema = null;
 +        for (int i = 0; i < 10; i++)
 +        {
 +            EndpointState localState = 
Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 +            schema = localState.getApplicationState(ApplicationState.SCHEMA);
 +            if (schema != null)
 +                break;
 +            Thread.sleep(1000);
 +        }
 +
 +        // schema is set and equals to "alternative" version
 +        assertTrue(schema != null);
 +        assertEquals(schema.value, 
Schema.instance.getAltVersion().toString());
 +
 +        // Upgrade remote host version to the latest one (3.11)
 +        Gossiper.instance.injectApplicationState(remoteHostAddress, 
ApplicationState.RELEASE_VERSION, 
StorageService.instance.valueFactory.releaseVersion());
 +
 +        
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
initialRemoteState));
 +
 +        // wait until the schema change
 +        VersionedValue newSchema = null;
 +        for (int i = 0; i < 10; i++)
 +        {
 +            EndpointState localState = 
Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 +            newSchema = 
localState.getApplicationState(ApplicationState.SCHEMA);
 +            if (!schema.value.equals(newSchema.value))
 +                break;
 +            Thread.sleep(1000);
 +        }
 +
 +        // schema is changed and equals to real version
 +        assertFalse(schema.value.equals(newSchema.value));
 +        assertEquals(newSchema.value, 
Schema.instance.getRealVersion().toString());
 +    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to