This is an automated email from the ASF dual-hosted git repository. jzhuang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit ba6821be4f952781d573cd6ed705250de6aeb5f7 Merge: 26a134a 71cb061 Author: Jay Zhuang <[email protected]> AuthorDate: Thu Jul 18 21:25:55 2019 -0700 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + src/java/org/apache/cassandra/gms/Gossiper.java | 51 ++++++--- .../org/apache/cassandra/gms/GossiperTest.java | 123 ++++++++++++++++++--- 3 files changed, 140 insertions(+), 35 deletions(-) diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index a6c9be7,6a862e5..062abe0 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -22,14 -24,10 +22,15 @@@ import java.util.* import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; +import java.util.stream.Collectors; + import javax.annotation.Nullable; + import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@@ -1377,25 -1289,24 +1378,41 @@@ public class Gossiper implements IFailu Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states(); assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); - localState.addApplicationStates(remoteStates); - - //Filter out pre-4.0 versions of data for more complete 4.0 versions - Set<Entry<ApplicationState, VersionedValue>> filtered = remoteStates.stream().filter(entry -> { - switch (entry.getKey()) - { - case INTERNAL_IP: - return remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) == null; - case STATUS: - return remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) == null; - case RPC_ADDRESS: - return remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) == null; - default: - return true; - } + - // filter out the states that are already up to date (has the same or higher version) ++ + Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> { ++ // Filter out pre-4.0 versions of data for more complete 4.0 versions ++ switch (entry.getKey()) ++ { ++ case INTERNAL_IP: ++ if (remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) != null) return false; ++ break; ++ case STATUS: ++ if (remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) != null) return false; ++ break; ++ case RPC_ADDRESS: ++ if (remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) != null) return false; ++ break; ++ default: ++ break; ++ } ++ ++ // filter out the states that are already up to date (has the same or higher version) + VersionedValue local = localState.getApplicationState(entry.getKey()); + return (local == null || local.version < entry.getValue().version); - }).collect(Collectors.toSet()); + }).collect(Collectors.toSet()); - for (Entry<ApplicationState, VersionedValue> remoteEntry : filtered) - doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); + if (logger.isTraceEnabled() && updatedStates.size() > 0) + { + for (Entry<ApplicationState, VersionedValue> entry : updatedStates) + { + logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr); + } + } + localState.addApplicationStates(updatedStates); + + for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates) + doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue()); } // notify that a local application state is going to change (doesn't get triggered for remote changes) diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java index 9c25b86,b6b3ffb..97c577c --- a/test/unit/org/apache/cassandra/gms/GossiperTest.java +++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java @@@ -114,158 -74,171 +115,244 @@@ public class GossiperTes } @Test - public void testLargeGenerationJump() throws UnknownHostException + public void testLargeGenerationJump() throws UnknownHostException, InterruptedException { Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); - InetAddressAndPort remoteHostAddress = hosts.get(1); + try + { - InetAddress remoteHostAddress = hosts.get(1); ++ InetAddressAndPort remoteHostAddress = hosts.get(1); + + EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); + HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState(); + + //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1 + assertEquals(initialRemoteHeartBeat.getGeneration(), 1); + + HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1); + EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); + + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); + + //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future + HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); + assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + + //Propose a generation 10 years in the future - this should be rejected. + HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis() / 1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10); + EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat); + + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState)); + + actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); + + //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future + assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + } + finally + { + // clean up the gossip states + Gossiper.instance.endpointStateMap.clear(); + } + } + + int stateChangedNum = 0; + + @Test + public void testDuplicatedStateUpdate() throws Exception + { + VersionedValue.VersionedValueFactory valueFactory = + new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); + try + { - InetAddress remoteHostAddress = hosts.get(1); ++ InetAddressAndPort remoteHostAddress = hosts.get(1); + + EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); + HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState(); + + //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1 + assertEquals(initialRemoteHeartBeat.getGeneration(), 1); + + HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration()); + EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); - EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); - HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState(); + final Token token = DatabaseDescriptor.getPartitioner().getRandomToken(); + VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token)); + proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue); - //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1 - assertEquals(initialRemoteHeartBeat.getGeneration(), 1); + Gossiper.instance.register( + new IEndpointStateChangeSubscriber() + { - public void onJoin(InetAddress endpoint, EndpointState epState) { } ++ public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { } - HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1); - EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { } ++ public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { } - Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) ++ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) + { + assertEquals(ApplicationState.TOKENS, state); + stateChangedNum++; + } - //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future - HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); - assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); - public void onAlive(InetAddress endpoint, EndpointState state) { } ++ public void onAlive(InetAddressAndPort endpoint, EndpointState state) { } - //Propose a generation 10 years in the future - this should be rejected. - HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis()/1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10); - EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat); - public void onDead(InetAddress endpoint, EndpointState state) { } ++ public void onDead(InetAddressAndPort endpoint, EndpointState state) { } - Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState)); - public void onRemove(InetAddress endpoint) { } ++ public void onRemove(InetAddressAndPort endpoint) { } - actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); - public void onRestart(InetAddress endpoint, EndpointState state) { } ++ public void onRestart(InetAddressAndPort endpoint, EndpointState state) { } + } + ); - //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future - assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + stateChangedNum = 0; + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); + assertEquals(1, stateChangedNum); + + HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); + assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + + // Clone a new HeartBeatState + proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration(), proposedRemoteHeartBeat.getHeartBeatVersion()); + proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); + + // Bump the heartbeat version and use the same TOKENS state + proposedRemoteHeartBeat.updateHeartBeat(); + proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue); + + // The following state change should only update heartbeat without updating the TOKENS state + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); + assertEquals(1, stateChangedNum); + + actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); + assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + } + finally + { + // clean up the gossip states + Gossiper.instance.endpointStateMap.clear(); + } } + // Note: This test might fail if for some reason the node broadcast address is in 127.99.0.0/16 @Test - public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException + public void testReloadSeeds() throws UnknownHostException { - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); - MessagingService.instance().listen(); - Gossiper.instance.start(1); - InetAddress remoteHostAddress = hosts.get(1); - - EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); - // Set to any 3.0 version - Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14")); + Gossiper gossiper = new Gossiper(false); + List<String> loadedList; + + // Initialize the seed list directly to a known set to start with + gossiper.seeds.clear(); + InetAddressAndPort addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.1.1")); + int nextSize = 4; + List<InetAddressAndPort> nextSeeds = new ArrayList<>(nextSize); + for (int i = 0; i < nextSize; i ++) + { + gossiper.seeds.add(addr); + nextSeeds.add(addr); + addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address)); + } + Assert.assertEquals(nextSize, gossiper.seeds.size()); + + // Add another unique address to the list + addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address)); + nextSeeds.add(addr); + nextSize++; + DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds)); + loadedList = gossiper.reloadSeeds(); + + // Check that the new entry was added + Assert.assertEquals(nextSize, loadedList.size()); + for (InetAddressAndPort a : nextSeeds) + assertTrue(loadedList.contains(a.toString())); + + // Check that the return value of the reloadSeeds matches the content of the getSeeds call + // and that they both match the internal contents of the Gossiper seeds list + Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size()); + for (InetAddressAndPort a : gossiper.seeds) + { + assertTrue(loadedList.contains(a.toString())); + assertTrue(gossiper.getSeeds().contains(a.toString())); + } - Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState)); + // Add a duplicate of the last address to the seed provider list + int uniqueSize = nextSize; + nextSeeds.add(addr); + nextSize++; + DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds)); + loadedList = gossiper.reloadSeeds(); + + // Check that the number of seed nodes reported hasn't increased + Assert.assertEquals(uniqueSize, loadedList.size()); + for (InetAddressAndPort a : nextSeeds) + assertTrue(loadedList.contains(a.toString())); + + // Create a new list that has no overlaps with the previous list + addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1")); + int disjointSize = 3; + List<InetAddressAndPort> disjointSeeds = new ArrayList<>(disjointSize); + for (int i = 0; i < disjointSize; i ++) + { + disjointSeeds.add(addr); + addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address)); + } + DatabaseDescriptor.setSeedProvider(new TestSeedProvider(disjointSeeds)); + loadedList = gossiper.reloadSeeds(); - // wait until the schema is set - VersionedValue schema = null; - for (int i = 0; i < 10; i++) + // Check that the list now contains exactly the new other list. + Assert.assertEquals(disjointSize, gossiper.getSeeds().size()); + Assert.assertEquals(disjointSize, loadedList.size()); + for (InetAddressAndPort a : disjointSeeds) { - EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0)); - schema = localState.getApplicationState(ApplicationState.SCHEMA); - if (schema != null) - break; - Thread.sleep(1000); + assertTrue(gossiper.getSeeds().contains(a.toString())); + assertTrue(loadedList.contains(a.toString())); } - // schema is set and equals to "alternative" version - assertTrue(schema != null); - assertEquals(schema.value, Schema.instance.getAltVersion().toString()); + // Set the seed node provider to return an empty list + DatabaseDescriptor.setSeedProvider(new TestSeedProvider(new ArrayList<InetAddressAndPort>())); + loadedList = gossiper.reloadSeeds(); + + // Check that the in memory seed node list was not modified + Assert.assertEquals(disjointSize, loadedList.size()); + for (InetAddressAndPort a : disjointSeeds) + assertTrue(loadedList.contains(a.toString())); + + // Change the seed provider to one that throws an unchecked exception + DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider()); + loadedList = gossiper.reloadSeeds(); - // Upgrade remote host version to the latest one (3.11) - Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion()); + // Check for the expected null response from a reload error + Assert.assertNull(loadedList); - Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState)); + // Check that the in memory seed node list was not modified and the exception was caught + Assert.assertEquals(disjointSize, gossiper.getSeeds().size()); + for (InetAddressAndPort a : disjointSeeds) + assertTrue(gossiper.getSeeds().contains(a.toString())); + } + + static class TestSeedProvider implements SeedProvider + { + private List<InetAddressAndPort> seeds; + + TestSeedProvider(List<InetAddressAndPort> seeds) + { + this.seeds = seeds; + } - // wait until the schema change - VersionedValue newSchema = null; - for (int i = 0; i < 10; i++) + @Override + public List<InetAddressAndPort> getSeeds() { - EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0)); - newSchema = localState.getApplicationState(ApplicationState.SCHEMA); - if (!schema.value.equals(newSchema.value)) - break; - Thread.sleep(1000); + return seeds; } + } - // schema is changed and equals to real version - assertFalse(schema.value.equals(newSchema.value)); - assertEquals(newSchema.value, Schema.instance.getRealVersion().toString()); + // A seed provider for testing which throws assertion errors when queried + static class ErrorSeedProvider implements SeedProvider + { + @Override + public List<InetAddressAndPort> getSeeds() + { + assert(false); + return new ArrayList<>(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
