This is an automated email from the ASF dual-hosted git repository. mck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c591978 Better handle legacy gossip application states during (and after) upgrades c591978 is described below commit c591978f4d265e42d0132418005ba63a99278c75 Author: Yifan Cai <y...@apache.org> AuthorDate: Mon Mar 15 20:40:25 2021 -0700 Better handle legacy gossip application states during (and after) upgrades Only remove duplicated legacy application states when `!hasMajorVersion3Nodes()` but always avoid duplicate status notifications. patch by Yifan Cai; reviewed by Mick Semb Wever for CASSANDRA-16525 --- CHANGES.txt | 1 + .../org/apache/cassandra/gms/EndpointState.java | 48 ++++++ src/java/org/apache/cassandra/gms/Gossiper.java | 54 +++---- .../distributed/upgrade/MixedModeGossipTest.java | 168 +++++++++++++++++++++ .../distributed/upgrade/UpgradeTestBase.java | 18 ++- .../operations/InsertUpdateIfConditionTest.java | 9 +- .../cassandra/db/filter/ColumnFilterTest.java | 8 +- .../org/apache/cassandra/gms/GossiperTest.java | 143 +++++++++++++++--- 8 files changed, 393 insertions(+), 56 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e6f15ff..b700547 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-beta5 + * Better handle legacy gossip application states during (and after) upgrades (CASSANDRA-16525) * Mark StreamingMetrics.ActiveOutboundStreams as deprecated (CASSANDRA-11174) * Increase the cqlsh version number (CASSANDRA-16509) * Fix the CQL generated for the views.where_clause column when some identifiers require quoting (CASSANDRA-16479) diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index a4b294c..b8d5626 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -20,11 +20,13 @@ package org.apache.cassandra.gms; import java.io.*; import java.util.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; @@ -84,6 +86,11 @@ public class EndpointState return applicationState.get().get(key); } + public boolean containsApplicationState(ApplicationState key) + { + return applicationState.get().containsKey(key); + } + public Set<Map.Entry<ApplicationState, VersionedValue>> states() { return applicationState.get().entrySet(); @@ -114,6 +121,47 @@ public class EndpointState } } + void removeMajorVersion3LegacyApplicationStates() + { + while (hasLegacyFields()) + { + Map<ApplicationState, VersionedValue> orig = applicationState.get(); + Map<ApplicationState, VersionedValue> updatedStates = filterMajorVersion3LegacyApplicationStates(orig); + // avoid updating if no state is removed + if (orig.size() == updatedStates.size() + || applicationState.compareAndSet(orig, updatedStates)) + return; + } + } + + private boolean hasLegacyFields() + { + Set<ApplicationState> statesPresent = applicationState.get().keySet(); + if (statesPresent.isEmpty()) + return false; + return (statesPresent.contains(ApplicationState.STATUS) && statesPresent.contains(ApplicationState.STATUS_WITH_PORT)) + || (statesPresent.contains(ApplicationState.INTERNAL_IP) && statesPresent.contains(ApplicationState.INTERNAL_ADDRESS_AND_PORT)) + || (statesPresent.contains(ApplicationState.RPC_ADDRESS) && statesPresent.contains(ApplicationState.NATIVE_ADDRESS_AND_PORT)); + } + + private static Map<ApplicationState, VersionedValue> filterMajorVersion3LegacyApplicationStates(Map<ApplicationState, VersionedValue> states) + { + return states.entrySet().stream().filter(entry -> { + // Filter out pre-4.0 versions of data for more complete 4.0 versions + switch (entry.getKey()) + { + case INTERNAL_IP: + return !states.containsKey(ApplicationState.INTERNAL_ADDRESS_AND_PORT); + case STATUS: + return !states.containsKey(ApplicationState.STATUS_WITH_PORT); + case RPC_ADDRESS: + return !states.containsKey(ApplicationState.NATIVE_ADDRESS_AND_PORT); + default: + return true; + } + }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + /* getters and setters */ /** * @return System.nanoTime() when state was updated last time. diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index a092c77..b5434aa 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -175,9 +175,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (!upgradeInProgressPossible) return new ExpiringMemoizingSupplier.Memoized<>(null); - Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); - CassandraVersion minVersion = SystemKeyspace.CURRENT_VERSION.familyLowerBound.get(); + + // Skip the round if the gossiper has not started yet + // Otherwise, upgradeInProgressPossible can be set to false wrongly. + if (!isEnabled()) + { + return new ExpiringMemoizingSupplier.Memoized<>(minVersion); + } + + Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), + Gossiper.instance.getUnreachableMembers()); boolean allHostsHaveKnownVersion = true; for (InetAddressAndPort host : allHosts) { @@ -1394,7 +1402,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } EndpointState localEpStatePtr = endpointStateMap.get(ep); - EndpointState remoteState = removeRedundantApplicationStates(entry.getValue()); + EndpointState remoteState = entry.getValue(); + if (!hasMajorVersion3Nodes()) + remoteState.removeMajorVersion3LegacyApplicationStates(); /* If state does not exist just add it. If it does then add it if the remote generation is greater. @@ -1452,32 +1462,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - // remove duplicated deprecated states - private static EndpointState removeRedundantApplicationStates(EndpointState remoteState) - { - if (remoteState.states().isEmpty()) - return remoteState; - - Map<ApplicationState, VersionedValue> updatedStates = remoteState.states().stream().filter(entry -> { - // Filter out pre-4.0 versions of data for more complete 4.0 versions - switch (entry.getKey()) - { - case INTERNAL_IP: - return (null == remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT)); - case STATUS: - return (null == remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT)); - case RPC_ADDRESS: - return (null == remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT)); - default: - return true; - } - }).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - - EndpointState updated = new EndpointState(remoteState.getHeartBeatState(), updatedStates); - if (!remoteState.isAlive()) updated.markDead(); - return updated; - } - private void applyNewStates(InetAddressAndPort addr, EndpointState localState, EndpointState remoteState) { // don't assert here, since if the node restarts the version will go back to zero @@ -1506,8 +1490,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } localState.addApplicationStates(updatedStates); + // get rid of legacy fields once the cluster is not in mixed mode + if (!hasMajorVersion3Nodes()) + localState.removeMajorVersion3LegacyApplicationStates(); + for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates) + { + // filters out legacy change notifications + // only if local state already indicates that the peer has the new fields + if ((ApplicationState.INTERNAL_IP == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT)) + ||(ApplicationState.STATUS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.STATUS_WITH_PORT)) + || (ApplicationState.RPC_ADDRESS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT))) + continue; doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue()); + } } // notify that a local application state is going to change (doesn't get triggered for remote changes) diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java new file mode 100644 index 0000000..83e911d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; + +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.shared.Versions; +import org.apache.cassandra.net.Verb; +import org.assertj.core.api.Assertions; + +public class MixedModeGossipTest extends UpgradeTestBase +{ + Pattern expectedNormalStatus = Pattern.compile("STATUS:\\d+:NORMAL,-?\\d+"); + Pattern expectedNormalStatusWithPort = Pattern.compile("STATUS_WITH_PORT:\\d+:NORMAL,-?\\d+"); + Pattern expectedNormalX3 = Pattern.compile("X3:\\d+:NORMAL,-?\\d+"); + + @Test + public void testStatusFieldShouldExistInOldVersionNodes() throws Throwable + { + new TestCase() + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .nodes(3) + .nodesToUpgradeOrdered(1, 2, 3) + .upgrade(Versions.Major.v30, Versions.Major.v4) + .upgrade(Versions.Major.v3X, Versions.Major.v4) + .setup(c -> {}) + .runAfterNodeUpgrade((cluster, node) -> { + if (node == 1) { + checkPeerGossipInfoShouldContainNormalStatus(cluster, 2); + checkPeerGossipInfoShouldContainNormalStatus(cluster, 3); + } + if (node == 2) { + checkPeerGossipInfoShouldContainNormalStatus(cluster, 3); + } + }) + .runAfterClusterUpgrade(cluster -> { + // wait 1 minute for `org.apache.cassandra.gms.Gossiper.upgradeFromVersionSupplier` to update + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES); + checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(cluster); + }) + .run(); + } + + /** + * Similar to {@link #testStatusFieldShouldExistInOldVersionNodes}, but in an edge case that + * 1) node2 and node3 cannot gossip with each other. + * 2) node2 sends SYN to node1 first when upgrading. + * 3) node3 is at the lower version during the cluster upgrade + * In this case, node3 gossip info does not contain STATUS field for node2 + */ + @Test + public void testStatusFieldShouldExistInOldVersionNodesEdgeCase() throws Throwable + { + AtomicReference<IMessageFilters.Filter> n1GossipSynBlocker = new AtomicReference<>(); + new TestCase() + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .nodes(3) + .nodesToUpgradeOrdered(1, 2, 3) + .upgrade(Versions.Major.v30, Versions.Major.v4) + .upgrade(Versions.Major.v3X, Versions.Major.v4) + .setup(cluster -> { + // node2 and node3 gossiper cannot talk with each other + cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(2).to(3).drop(); + cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(3).to(2).drop(); + }) + .runAfterNodeUpgrade((cluster, node) -> { + // let node2 sends the SYN to node1 first + if (node == 1) + { + IMessageFilters.Filter filter = cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(1).to(2).drop(); + n1GossipSynBlocker.set(filter); + } + else if (node == 2) + { + n1GossipSynBlocker.get().off(); + String node3GossipView = cluster.get(3).nodetoolResult("gossipinfo").getStdout(); + String node2GossipState = getGossipStateOfNode(node3GossipView, "/127.0.0.2"); + Assertions.assertThat(node2GossipState) + .as("The node2's gossip state from node3's perspective should contain status. " + + "And it should carry an unrecognized field X3 with NORMAL.") + .containsPattern(expectedNormalStatus) + .containsPattern(expectedNormalX3); + } + }) + .runAfterClusterUpgrade(cluster -> { + // wait 1 minute for `org.apache.cassandra.gms.Gossiper.upgradeFromVersionSupplier` to update + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES); + checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(cluster); + }) + .run(); + } + + private void checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(UpgradeableCluster cluster) + { + for (int i = 1; i <= 3; i++) + { + int n = i; + checkPeerGossipInfo(cluster, i, (gossipInfo, peers) -> { + for (String p : peers) + { + Assertions.assertThat(getGossipStateOfNode(gossipInfo, p)) + .as(String.format("%s gossip state in node%s should not contain STATUS " + + "and should contain STATUS_WITH_PORT.", p, n)) + .doesNotContain("STATUS:") + .containsPattern(expectedNormalStatusWithPort); + } + }); + } + } + + private void checkPeerGossipInfoShouldContainNormalStatus(UpgradeableCluster cluster, int node) + { + checkPeerGossipInfo(cluster, node, (gossipInfo, peers) -> { + for (String n : peers) + { + Assertions.assertThat(getGossipStateOfNode(gossipInfo, n)) + .containsPattern(expectedNormalStatus); + } + }); + } + + private void checkPeerGossipInfo(UpgradeableCluster cluster, int node, BiConsumer<String, Set<String>> verifier) + { + Set<Integer> peers = new HashSet<>(Arrays.asList(1, 2, 3)); + peers.remove(node); + String gossipInfo = cluster.get(node).nodetoolResult("gossipinfo").getStdout(); + verifier.accept(gossipInfo, peers.stream().map(i -> "127.0.0." + i).collect(Collectors.toSet())); + } + + private String getGossipStateOfNode(String rawOutput, String nodeInterested) + { + String temp = rawOutput.substring(rawOutput.indexOf(nodeInterested)); + int nextSlashIndex = temp.indexOf('/', 1); + if (nextSlashIndex != -1) + return temp.substring(0, nextSlashIndex); + else + return temp; + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java index 39957e9..46c8d24 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java @@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.upgrade; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.function.Consumer; @@ -32,7 +33,9 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.UpgradeableCluster; import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.impl.AbstractCluster.AbstractBuilder; import org.apache.cassandra.distributed.impl.Instance; import org.apache.cassandra.distributed.shared.DistributedTestBase; import org.apache.cassandra.distributed.shared.Versions; @@ -93,7 +96,7 @@ public class UpgradeTestBase extends DistributedTestBase private RunOnCluster setup; private RunOnClusterAndNode runAfterNodeUpgrade; private RunOnCluster runAfterClusterUpgrade; - private final Set<Integer> nodesToUpgrade = new HashSet<>(); + private final Set<Integer> nodesToUpgrade = new LinkedHashSet<>(); private Consumer<IInstanceConfig> configConsumer; public TestCase() @@ -191,13 +194,24 @@ public class UpgradeTestBase extends DistributedTestBase } public TestCase nodesToUpgrade(int ... nodes) { + Set<Integer> set = new HashSet<>(nodes.length); + for (int n : nodes) + { + set.add(n); + } + nodesToUpgrade.addAll(set); + return this; + } + + public TestCase nodesToUpgradeOrdered(int ... nodes) + { for (int n : nodes) { nodesToUpgrade.add(n); } return this; } - } + } protected TestCase allUpgrades(int nodes, int... toUpgrade) { diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java index 59770b8..4db1ef5 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -72,7 +73,7 @@ public class InsertUpdateIfConditionTest extends CQLTester @BeforeClass public static void beforeClass() { - Gossiper.instance.maybeInitializeLocalState(0); + Gossiper.instance.start(0); } @Before @@ -82,6 +83,12 @@ public class InsertUpdateIfConditionTest extends CQLTester assertion.run(); } + @AfterClass + public static void afterClass() + { + Gossiper.instance.stop(); + } + /** * Migrated from cql_tests.py:TestCQL.cas_simple_test() */ diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java index 194a92f..fd7ee25 100644 --- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java +++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java @@ -31,6 +31,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.marshal.Int32Type; @@ -41,6 +42,7 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.SimpleSnitch; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; @@ -89,7 +91,9 @@ public class ColumnFilterTest @BeforeClass public static void beforeClass() { - Gossiper.instance.maybeInitializeLocalState(0); + DatabaseDescriptor.setSeedProvider(Arrays::asList); + DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch()); + Gossiper.instance.start(0); } @Before @@ -533,4 +537,4 @@ public class ColumnFilterTest } } } -} \ No newline at end of file +} diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java index c7abb44..ab28572 100644 --- a/test/unit/org/apache/cassandra/gms/GossiperTest.java +++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java @@ -24,10 +24,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import com.google.common.collect.ImmutableMap; import com.google.common.net.InetAddresses; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -48,6 +51,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class GossiperTest { @@ -81,6 +85,12 @@ public class GossiperTest DatabaseDescriptor.setSeedProvider(originalSeedProvider); } + @AfterClass + public static void afterClass() + { + Gossiper.instance.stop(); + } + @Test public void testPaddingIntact() throws Exception { @@ -100,6 +110,7 @@ public class GossiperTest @Test public void testHasVersion3Nodes() throws Exception { + Gossiper.instance.start(0); Gossiper.instance.expireUpgradeFromVersion(); VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null); @@ -186,6 +197,7 @@ public class GossiperTest VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); + SimpleStateChangeListener stateChangeListener = null; Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); try { @@ -204,28 +216,12 @@ public class GossiperTest VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token)); proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue); - Gossiper.instance.register( - new IEndpointStateChangeSubscriber() - { - public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { } - - public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { } - - public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) - { - assertEquals(ApplicationState.TOKENS, state); - stateChangedNum++; - } - - public void onAlive(InetAddressAndPort endpoint, EndpointState state) { } - - public void onDead(InetAddressAndPort endpoint, EndpointState state) { } - - public void onRemove(InetAddressAndPort endpoint) { } - - public void onRestart(InetAddressAndPort endpoint, EndpointState state) { } - } - ); + stateChangeListener = new SimpleStateChangeListener(); + stateChangeListener.setOnChangeVerifier(onChangeParams -> { + assertEquals(ApplicationState.TOKENS, onChangeParams.state); + stateChangedNum++; + }); + Gossiper.instance.register(stateChangeListener); stateChangedNum = 0; Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); @@ -253,6 +249,8 @@ public class GossiperTest { // clean up the gossip states Gossiper.instance.endpointStateMap.clear(); + if (stateChangeListener != null) + Gossiper.instance.unregister(stateChangeListener); } } @@ -352,6 +350,107 @@ public class GossiperTest assertTrue(gossiper.getSeeds().contains(a.toString())); } + @Test + public void testNotFireDuplicatedNotificationsWithUpdateContainsOldAndNewState() throws UnknownHostException + { + VersionedValue.VersionedValueFactory valueFactory = + new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); + SimpleStateChangeListener stateChangeListener = null; + try + { + InetAddressAndPort remoteHostAddress = hosts.get(1); + EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); + HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState(); + //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1 + assertEquals(initialRemoteHeartBeat.getGeneration(), 1); + + // Test begins + AtomicInteger notificationCount = new AtomicInteger(0); + HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration()); + EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); + final Token token = DatabaseDescriptor.getPartitioner().getRandomToken(); + proposedRemoteState.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singletonList(token))); + + stateChangeListener = new SimpleStateChangeListener(); + Gossiper.instance.register(stateChangeListener); + + // STEP 1. register verifier and apply state with just STATUS + // simulate applying gossip state from a v3 peer + stateChangeListener.setOnChangeVerifier(onChangeParams -> { + notificationCount.getAndIncrement(); + assertEquals("It should fire notification for STATUS when gossiper local state not yet has STATUS_WITH_PORT", + ApplicationState.STATUS, onChangeParams.state); + }); + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); + + // STEP 2. includes both STATUS and STATUS_WITH_PORT. The gossiper knows that the remote peer is now in v4 + // update verifier and apply state again + proposedRemoteState.addApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(Collections.singletonList(token))); + stateChangeListener.setOnChangeVerifier(onChangeParams -> { + notificationCount.getAndIncrement(); + assertEquals("It should only fire notification for STATUS_WITH_PORT", + ApplicationState.STATUS_WITH_PORT, onChangeParams.state); + }); + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); + + // STEP 3. somehow, the peer send only the STATUS in the update. + proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); + proposedRemoteState.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singletonList(token))); + stateChangeListener.setOnChangeVerifier(onChangeParams -> { + notificationCount.getAndIncrement(); + fail("It should not fire notification for STATUS"); + }); + + assertEquals("Expect exact 2 notifications with the test setup", + 2, notificationCount.get()); + } + finally + { + // clean up the gossip states + Gossiper.instance.endpointStateMap.clear(); + if (stateChangeListener != null) + Gossiper.instance.unregister(stateChangeListener); + } + } + + static class SimpleStateChangeListener implements IEndpointStateChangeSubscriber + { + static class OnChangeParams + { + InetAddressAndPort endpoint; + ApplicationState state; + VersionedValue value; + + OnChangeParams(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) + { + this.endpoint = endpoint; + this.state = state; + this.value = value; + } + } + + private volatile Consumer<OnChangeParams> onChangeVerifier; + + public void setOnChangeVerifier(Consumer<OnChangeParams> verifier) + { + onChangeVerifier = verifier; + } + + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {} + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void onAlive(InetAddressAndPort endpoint, EndpointState state) {} + public void onDead(InetAddressAndPort endpoint, EndpointState state) {} + public void onRemove(InetAddressAndPort endpoint) {} + public void onRestart(InetAddressAndPort endpoint, EndpointState state) {} + + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) + { + onChangeVerifier.accept(new OnChangeParams(endpoint, state, value)); + } + } + static class TestSeedProvider implements SeedProvider { private List<InetAddressAndPort> seeds; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org