This is an automated email from the ASF dual-hosted git repository.

jzhuang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ba6821be4f952781d573cd6ed705250de6aeb5f7
Merge: 26a134a 71cb061
Author: Jay Zhuang <[email protected]>
AuthorDate: Thu Jul 18 21:25:55 2019 -0700

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  51 ++++++---
 .../org/apache/cassandra/gms/GossiperTest.java     | 123 ++++++++++++++++++---
 3 files changed, 140 insertions(+), 35 deletions(-)

diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index a6c9be7,6a862e5..062abe0
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -22,14 -24,10 +22,15 @@@ import java.util.*
  import java.util.Map.Entry;
  import java.util.concurrent.*;
  import java.util.concurrent.locks.ReentrantLock;
 +import java.util.function.BooleanSupplier;
 +import java.util.function.Supplier;
 +import java.util.stream.Collectors;
 +
  import javax.annotation.Nullable;
+ import java.util.stream.Collectors;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Suppliers;
  import com.google.common.base.Throwables;
  import com.google.common.collect.ImmutableList;
  import com.google.common.collect.ImmutableMap;
@@@ -1377,25 -1289,24 +1378,41 @@@ public class Gossiper implements IFailu
  
          Set<Entry<ApplicationState, VersionedValue>> remoteStates = 
remoteState.states();
          assert remoteState.getHeartBeatState().getGeneration() == 
localState.getHeartBeatState().getGeneration();
-         localState.addApplicationStates(remoteStates);
- 
-         //Filter out pre-4.0 versions of data for more complete 4.0 versions
-         Set<Entry<ApplicationState, VersionedValue>> filtered = 
remoteStates.stream().filter(entry -> {
-            switch (entry.getKey())
-            {
-                case INTERNAL_IP:
-                     return 
remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) == 
null;
-                case STATUS:
-                    return 
remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) == null;
-                case RPC_ADDRESS:
-                    return 
remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) == 
null;
-                default:
-                    return true;
-            }
+ 
 -        // filter out the states that are already up to date (has the same or 
higher version)
++
+         Set<Entry<ApplicationState, VersionedValue>> updatedStates = 
remoteStates.stream().filter(entry -> {
++            // Filter out pre-4.0 versions of data for more complete 4.0 
versions
++            switch (entry.getKey())
++            {
++                case INTERNAL_IP:
++                    if 
(remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) != 
null) return false;
++                    break;
++                case STATUS:
++                    if 
(remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) != null) 
return false;
++                    break;
++                case RPC_ADDRESS:
++                    if 
(remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) != 
null) return false;
++                    break;
++                default:
++                    break;
++            }
++
++            // filter out the states that are already up to date (has the 
same or higher version)
+             VersionedValue local = 
localState.getApplicationState(entry.getKey());
+             return (local == null || local.version < 
entry.getValue().version);
 -            }).collect(Collectors.toSet());
 +        }).collect(Collectors.toSet());
  
-         for (Entry<ApplicationState, VersionedValue> remoteEntry : filtered)
-             doOnChangeNotifications(addr, remoteEntry.getKey(), 
remoteEntry.getValue());
+         if (logger.isTraceEnabled() && updatedStates.size() > 0)
+         {
+             for (Entry<ApplicationState, VersionedValue> entry : 
updatedStates)
+             {
+                 logger.trace("Updating {} state version to {} for {}", 
entry.getKey().toString(), entry.getValue().version, addr);
+             }
+         }
+         localState.addApplicationStates(updatedStates);
+ 
+         for (Entry<ApplicationState, VersionedValue> updatedEntry : 
updatedStates)
+             doOnChangeNotifications(addr, updatedEntry.getKey(), 
updatedEntry.getValue());
      }
  
      // notify that a local application state is going to change (doesn't get 
triggered for remote changes)
diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java
index 9c25b86,b6b3ffb..97c577c
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@@ -114,158 -74,171 +115,244 @@@ public class GossiperTes
      }
  
      @Test
 -    public void testLargeGenerationJump() throws UnknownHostException
 +    public void testLargeGenerationJump() throws UnknownHostException, 
InterruptedException
      {
          Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 2);
-         InetAddressAndPort remoteHostAddress = hosts.get(1);
+         try
+         {
 -            InetAddress remoteHostAddress = hosts.get(1);
++            InetAddressAndPort remoteHostAddress = hosts.get(1);
+ 
+             EndpointState initialRemoteState = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+             HeartBeatState initialRemoteHeartBeat = 
initialRemoteState.getHeartBeatState();
+ 
+             //Util.createInitialRing should have initialized remoteHost's 
HeartBeatState's generation to 1
+             assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+ 
+             HeartBeatState proposedRemoteHeartBeat = new 
HeartBeatState(initialRemoteHeartBeat.getGeneration() + 
Gossiper.MAX_GENERATION_DIFFERENCE + 1);
+             EndpointState proposedRemoteState = new 
EndpointState(proposedRemoteHeartBeat);
+ 
+             
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
proposedRemoteState));
+ 
+             //The generation should have been updated because it isn't over 
Gossiper.MAX_GENERATION_DIFFERENCE in the future
+             HeartBeatState actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+ 
+             //Propose a generation 10 years in the future - this should be 
rejected.
+             HeartBeatState badProposedRemoteHeartBeat = new 
HeartBeatState((int) (System.currentTimeMillis() / 1000) + 
Gossiper.MAX_GENERATION_DIFFERENCE * 10);
+             EndpointState badProposedRemoteState = new 
EndpointState(badProposedRemoteHeartBeat);
+ 
+             
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
badProposedRemoteState));
+ 
+             actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+ 
+             //The generation should not have been updated because it is over 
Gossiper.MAX_GENERATION_DIFFERENCE in the future
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+         }
+         finally
+         {
+             // clean up the gossip states
+             Gossiper.instance.endpointStateMap.clear();
+         }
+     }
+ 
+     int stateChangedNum = 0;
+ 
+     @Test
+     public void testDuplicatedStateUpdate() throws Exception
+     {
+         VersionedValue.VersionedValueFactory valueFactory =
+             new 
VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+ 
+         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 2);
+         try
+         {
 -            InetAddress remoteHostAddress = hosts.get(1);
++            InetAddressAndPort remoteHostAddress = hosts.get(1);
+ 
+             EndpointState initialRemoteState = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+             HeartBeatState initialRemoteHeartBeat = 
initialRemoteState.getHeartBeatState();
+ 
+             //Util.createInitialRing should have initialized remoteHost's 
HeartBeatState's generation to 1
+             assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+ 
+             HeartBeatState proposedRemoteHeartBeat = new 
HeartBeatState(initialRemoteHeartBeat.getGeneration());
+             EndpointState proposedRemoteState = new 
EndpointState(proposedRemoteHeartBeat);
  
-         EndpointState initialRemoteState = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
-         HeartBeatState initialRemoteHeartBeat = 
initialRemoteState.getHeartBeatState();
+             final Token token = 
DatabaseDescriptor.getPartitioner().getRandomToken();
+             VersionedValue tokensValue = 
valueFactory.tokens(Collections.singletonList(token));
+             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, 
tokensValue);
  
-         //Util.createInitialRing should have initialized remoteHost's 
HeartBeatState's generation to 1
-         assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+             Gossiper.instance.register(
+             new IEndpointStateChangeSubscriber()
+             {
 -                public void onJoin(InetAddress endpoint, EndpointState 
epState) { }
++                public void onJoin(InetAddressAndPort endpoint, EndpointState 
epState) { }
  
-         HeartBeatState proposedRemoteHeartBeat = new 
HeartBeatState(initialRemoteHeartBeat.getGeneration() + 
Gossiper.MAX_GENERATION_DIFFERENCE + 1);
-         EndpointState proposedRemoteState = new 
EndpointState(proposedRemoteHeartBeat);
 -                public void beforeChange(InetAddress endpoint, EndpointState 
currentState, ApplicationState newStateKey, VersionedValue newValue) { }
++                public void beforeChange(InetAddressAndPort endpoint, 
EndpointState currentState, ApplicationState newStateKey, VersionedValue 
newValue) { }
  
-         
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
proposedRemoteState));
 -                public void onChange(InetAddress endpoint, ApplicationState 
state, VersionedValue value)
++                public void onChange(InetAddressAndPort endpoint, 
ApplicationState state, VersionedValue value)
+                 {
+                     assertEquals(ApplicationState.TOKENS, state);
+                     stateChangedNum++;
+                 }
  
-         //The generation should have been updated because it isn't over 
Gossiper.MAX_GENERATION_DIFFERENCE in the future
-         HeartBeatState actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
-         assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
 -                public void onAlive(InetAddress endpoint, EndpointState 
state) { }
++                public void onAlive(InetAddressAndPort endpoint, 
EndpointState state) { }
  
-         //Propose a generation 10 years in the future - this should be 
rejected.
-         HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) 
(System.currentTimeMillis()/1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
-         EndpointState badProposedRemoteState = new 
EndpointState(badProposedRemoteHeartBeat);
 -                public void onDead(InetAddress endpoint, EndpointState state) 
{ }
++                public void onDead(InetAddressAndPort endpoint, EndpointState 
state) { }
  
-         
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
badProposedRemoteState));
 -                public void onRemove(InetAddress endpoint) { }
++                public void onRemove(InetAddressAndPort endpoint) { }
  
-         actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
 -                public void onRestart(InetAddress endpoint, EndpointState 
state) { }
++                public void onRestart(InetAddressAndPort endpoint, 
EndpointState state) { }
+             }
+             );
  
-         //The generation should not have been updated because it is over 
Gossiper.MAX_GENERATION_DIFFERENCE in the future
-         assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+             stateChangedNum = 0;
+             
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
proposedRemoteState));
+             assertEquals(1, stateChangedNum);
+ 
+             HeartBeatState actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+ 
+             // Clone a new HeartBeatState
+             proposedRemoteHeartBeat = new 
HeartBeatState(initialRemoteHeartBeat.getGeneration(), 
proposedRemoteHeartBeat.getHeartBeatVersion());
+             proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ 
+             // Bump the heartbeat version and use the same TOKENS state
+             proposedRemoteHeartBeat.updateHeartBeat();
+             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, 
tokensValue);
+ 
+             // The following state change should only update heartbeat 
without updating the TOKENS state
+             
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
proposedRemoteState));
+             assertEquals(1, stateChangedNum);
+ 
+             actualRemoteHeartBeat = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), 
actualRemoteHeartBeat.getGeneration());
+         }
+         finally
+         {
+             // clean up the gossip states
+             Gossiper.instance.endpointStateMap.clear();
+         }
      }
  
 +    // Note: This test might fail if for some reason the node broadcast 
address is in 127.99.0.0/16
      @Test
 -    public void testSchemaVersionUpdate() throws UnknownHostException, 
InterruptedException
 +    public void testReloadSeeds() throws UnknownHostException
      {
 -        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 2);
 -        MessagingService.instance().listen();
 -        Gossiper.instance.start(1);
 -        InetAddress remoteHostAddress = hosts.get(1);
 -
 -        EndpointState initialRemoteState = 
Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
 -        // Set to any 3.0 version
 -        Gossiper.instance.injectApplicationState(remoteHostAddress, 
ApplicationState.RELEASE_VERSION, 
StorageService.instance.valueFactory.releaseVersion("3.0.14"));
 +        Gossiper gossiper = new Gossiper(false);
 +        List<String> loadedList;
 +
 +        // Initialize the seed list directly to a known set to start with
 +        gossiper.seeds.clear();
 +        InetAddressAndPort addr = 
InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.1.1"));
 +        int nextSize = 4;
 +        List<InetAddressAndPort> nextSeeds = new ArrayList<>(nextSize);
 +        for (int i = 0; i < nextSize; i ++)
 +        {
 +            gossiper.seeds.add(addr);
 +            nextSeeds.add(addr);
 +            addr = 
InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        }
 +        Assert.assertEquals(nextSize, gossiper.seeds.size());
 +
 +        // Add another unique address to the list
 +        addr = 
InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        nextSeeds.add(addr);
 +        nextSize++;
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the new entry was added
 +        Assert.assertEquals(nextSize, loadedList.size());
 +        for (InetAddressAndPort a : nextSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Check that the return value of the reloadSeeds matches the content 
of the getSeeds call
 +        // and that they both match the internal contents of the Gossiper 
seeds list
 +        Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size());
 +        for (InetAddressAndPort a : gossiper.seeds)
 +        {
 +            assertTrue(loadedList.contains(a.toString()));
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +        }
  
 -        
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
initialRemoteState));
 +        // Add a duplicate of the last address to the seed provider list
 +        int uniqueSize = nextSize;
 +        nextSeeds.add(addr);
 +        nextSize++;
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the number of seed nodes reported hasn't increased
 +        Assert.assertEquals(uniqueSize, loadedList.size());
 +        for (InetAddressAndPort a : nextSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Create a new list that has no overlaps with the previous list
 +        addr = 
InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1"));
 +        int disjointSize = 3;
 +        List<InetAddressAndPort> disjointSeeds = new 
ArrayList<>(disjointSize);
 +        for (int i = 0; i < disjointSize; i ++)
 +        {
 +            disjointSeeds.add(addr);
 +            addr = 
InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        }
 +        DatabaseDescriptor.setSeedProvider(new 
TestSeedProvider(disjointSeeds));
 +        loadedList = gossiper.reloadSeeds();
  
 -        // wait until the schema is set
 -        VersionedValue schema = null;
 -        for (int i = 0; i < 10; i++)
 +        // Check that the list now contains exactly the new other list.
 +        Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
 +        Assert.assertEquals(disjointSize, loadedList.size());
 +        for (InetAddressAndPort a : disjointSeeds)
          {
 -            EndpointState localState = 
Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 -            schema = localState.getApplicationState(ApplicationState.SCHEMA);
 -            if (schema != null)
 -                break;
 -            Thread.sleep(1000);
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +            assertTrue(loadedList.contains(a.toString()));
          }
  
 -        // schema is set and equals to "alternative" version
 -        assertTrue(schema != null);
 -        assertEquals(schema.value, 
Schema.instance.getAltVersion().toString());
 +        // Set the seed node provider to return an empty list
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(new 
ArrayList<InetAddressAndPort>()));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the in memory seed node list was not modified
 +        Assert.assertEquals(disjointSize, loadedList.size());
 +        for (InetAddressAndPort a : disjointSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Change the seed provider to one that throws an unchecked exception
 +        DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider());
 +        loadedList = gossiper.reloadSeeds();
  
 -        // Upgrade remote host version to the latest one (3.11)
 -        Gossiper.instance.injectApplicationState(remoteHostAddress, 
ApplicationState.RELEASE_VERSION, 
StorageService.instance.valueFactory.releaseVersion());
 +        // Check for the expected null response from a reload error
 +        Assert.assertNull(loadedList);
  
 -        
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, 
initialRemoteState));
 +        // Check that the in memory seed node list was not modified and the 
exception was caught
 +        Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
 +        for (InetAddressAndPort a : disjointSeeds)
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +    }
 +
 +    static class TestSeedProvider implements SeedProvider
 +    {
 +        private List<InetAddressAndPort> seeds;
 +
 +        TestSeedProvider(List<InetAddressAndPort> seeds)
 +        {
 +            this.seeds = seeds;
 +        }
  
 -        // wait until the schema change
 -        VersionedValue newSchema = null;
 -        for (int i = 0; i < 10; i++)
 +        @Override
 +        public List<InetAddressAndPort> getSeeds()
          {
 -            EndpointState localState = 
Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 -            newSchema = 
localState.getApplicationState(ApplicationState.SCHEMA);
 -            if (!schema.value.equals(newSchema.value))
 -                break;
 -            Thread.sleep(1000);
 +            return seeds;
          }
 +    }
  
 -        // schema is changed and equals to real version
 -        assertFalse(schema.value.equals(newSchema.value));
 -        assertEquals(newSchema.value, 
Schema.instance.getRealVersion().toString());
 +    // A seed provider for testing which throws assertion errors when queried
 +    static class ErrorSeedProvider implements SeedProvider
 +    {
 +        @Override
 +        public List<InetAddressAndPort> getSeeds()
 +        {
 +            assert(false);
 +            return new ArrayList<>();
 +        }
      }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to