This is an automated email from the ASF dual-hosted git repository. jzhuang pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 71cb0616b7710366a8cd364348c864d656dc5542 Merge: 670dde9 3f70e7c Author: Jay Zhuang <[email protected]> AuthorDate: Thu Jul 18 21:23:17 2019 -0700 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + src/java/org/apache/cassandra/gms/Gossiper.java | 21 +++- .../org/apache/cassandra/gms/GossiperTest.java | 123 ++++++++++++++++++--- 3 files changed, 124 insertions(+), 21 deletions(-) diff --cc CHANGES.txt index f055068,f04b489..0233c0f --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,8 -1,5 +1,9 @@@ -3.0.19 +3.11.5 + * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config (CASSANDRA-14305) + * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903) + * Add flag to disable SASI indexes, and warnings on creation (CASSANDRA-14866) +Merged from 3.0: + * Avoid updating unchanged gossip states (CASSANDRA-15097) * Prevent recreation of previously dropped columns with a different kind (CASSANDRA-14948) * Prevent client requests from blocking on executor task queue (CASSANDRA-15013) * Toughen up column drop/recreate type validations (CASSANDRA-15204) diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index 5d2e997,c39f45a..6a862e5 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -24,7 -23,7 +24,8 @@@ import java.util.* import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; + import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; @@@ -1288,12 -1253,26 +1289,26 @@@ public class Gossiper implements IFailu Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states(); assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); - localState.addApplicationStates(remoteStates); - for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates) - doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); + // filter out the states that are already up to date (has the same or higher version) + Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> { + VersionedValue local = localState.getApplicationState(entry.getKey()); + return (local == null || local.version < entry.getValue().version); + }).collect(Collectors.toSet()); + + if (logger.isTraceEnabled() && updatedStates.size() > 0) + { + for (Entry<ApplicationState, VersionedValue> entry : updatedStates) + { + logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr); + } + } + localState.addApplicationStates(updatedStates); + + for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates) + doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue()); } - + // notify that a local application state is going to change (doesn't get triggered for remote changes) private void doBeforeChangeNotifications(InetAddress addr, EndpointState epState, ApplicationState apState, VersionedValue newValue) { diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java index 448620a,42e4483..b6b3ffb --- a/test/unit/org/apache/cassandra/gms/GossiperTest.java +++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java @@@ -70,88 -58,124 +71,174 @@@ public class GossiperTes public void setup() { tmd.clearUnsafe(); - }; + } @Test - public void testLargeGenerationJump() throws UnknownHostException, InterruptedException + public void testLargeGenerationJump() throws UnknownHostException { Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); - InetAddress remoteHostAddress = hosts.get(1); + try + { + InetAddress remoteHostAddress = hosts.get(1); - EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); - HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState(); + EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); + HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState(); - //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1 - assertEquals(initialRemoteHeartBeat.getGeneration(), 1); + //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1 + assertEquals(initialRemoteHeartBeat.getGeneration(), 1); - HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1); - EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); + HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1); + EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); - Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); - //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future - HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); - assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future + HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); + assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); - //Propose a generation 10 years in the future - this should be rejected. - HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis()/1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10); - EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat); + //Propose a generation 10 years in the future - this should be rejected. + HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis() / 1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10); + EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat); - Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState)); + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState)); - actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); + actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); - //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future - assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future + assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + } + finally + { + // clean up the gossip states + Gossiper.instance.endpointStateMap.clear(); + } + } + + int stateChangedNum = 0; + + @Test + public void testDuplicatedStateUpdate() throws Exception + { + VersionedValue.VersionedValueFactory valueFactory = + new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); + try + { + InetAddress remoteHostAddress = hosts.get(1); + + EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); + HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState(); + + //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1 + assertEquals(initialRemoteHeartBeat.getGeneration(), 1); + + HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration()); + EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); + + final Token token = DatabaseDescriptor.getPartitioner().getRandomToken(); + VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token)); + proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue); + + Gossiper.instance.register( + new IEndpointStateChangeSubscriber() + { + public void onJoin(InetAddress endpoint, EndpointState epState) { } + + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { } + + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) + { + assertEquals(ApplicationState.TOKENS, state); + stateChangedNum++; + } + + public void onAlive(InetAddress endpoint, EndpointState state) { } + + public void onDead(InetAddress endpoint, EndpointState state) { } + + public void onRemove(InetAddress endpoint) { } + + public void onRestart(InetAddress endpoint, EndpointState state) { } + } + ); + + stateChangedNum = 0; + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); + assertEquals(1, stateChangedNum); + + HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); + assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + + // Clone a new HeartBeatState + proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration(), proposedRemoteHeartBeat.getHeartBeatVersion()); + proposedRemoteState = new EndpointState(proposedRemoteHeartBeat); + + // Bump the heartbeat version and use the same TOKENS state + proposedRemoteHeartBeat.updateHeartBeat(); + proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue); + + // The following state change should only update heartbeat without updating the TOKENS state + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState)); + assertEquals(1, stateChangedNum); + + actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState(); + assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration()); + } + finally + { + // clean up the gossip states + Gossiper.instance.endpointStateMap.clear(); + } } + + @Test + public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException + { + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); + MessagingService.instance().listen(); + Gossiper.instance.start(1); + InetAddress remoteHostAddress = hosts.get(1); + + EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); + // Set to any 3.0 version + Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14")); + + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState)); + + // wait until the schema is set + VersionedValue schema = null; + for (int i = 0; i < 10; i++) + { + EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0)); + schema = localState.getApplicationState(ApplicationState.SCHEMA); + if (schema != null) + break; + Thread.sleep(1000); + } + + // schema is set and equals to "alternative" version + assertTrue(schema != null); + assertEquals(schema.value, Schema.instance.getAltVersion().toString()); + + // Upgrade remote host version to the latest one (3.11) + Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion()); + + Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState)); + + // wait until the schema change + VersionedValue newSchema = null; + for (int i = 0; i < 10; i++) + { + EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0)); + newSchema = localState.getApplicationState(ApplicationState.SCHEMA); + if (!schema.value.equals(newSchema.value)) + break; + Thread.sleep(1000); + } + + // schema is changed and equals to real version + assertFalse(schema.value.equals(newSchema.value)); + assertEquals(newSchema.value, Schema.instance.getRealVersion().toString()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
