This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 64f54f9 Avoid processing redundant application states on endpoint
changes
64f54f9 is described below
commit 64f54f9fb0ac1fe2920f44379326cf076ab8aab8
Author: Mick Semb Wever <[email protected]>
AuthorDate: Fri Feb 26 16:14:08 2021 +0100
Avoid processing redundant application states on endpoint changes
Also default `RangesAtEndpoint..add(...)` to `Conflict.DUPLICATE` to filter
out exact duplicates.
patch by Adam Holmberg, Mick Semb Wever; reviewed by Berenguer Blasi,
Brandon Williams, Adam Holmberg, Mick Semb Wever for CASSANDRA-16381
Co-authored-by: Adam Holmberg <[email protected]>
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 44 +++++++++++++---------
.../apache/cassandra/locator/RangesAtEndpoint.java | 5 +++
3 files changed, 33 insertions(+), 17 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 95028a6..7b00c4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Avoid processing redundant application states on endpoint changes
(CASSANDRA-16381)
* Prevent parent repair sessions leak (CASSANDRA-16446)
* Fix timestamp issue in SinglePartitionSliceCommandTest
testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
* Promote protocol V5 out of beta (CASSANDRA-14973)
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 7acaec9..7720379 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1376,7 +1376,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
}
EndpointState localEpStatePtr = endpointStateMap.get(ep);
- EndpointState remoteState = entry.getValue();
+ EndpointState remoteState =
removeRedundantApplicationStates(entry.getValue());
/*
If state does not exist just add it. If it does then add it if
the remote generation is greater.
@@ -1434,6 +1434,32 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
}
}
+ // remove duplicated deprecated states
+ private static EndpointState
removeRedundantApplicationStates(EndpointState remoteState)
+ {
+ if (remoteState.states().isEmpty())
+ return remoteState;
+
+ Map<ApplicationState, VersionedValue> updatedStates =
remoteState.states().stream().filter(entry -> {
+ // Filter out pre-4.0 versions of data for more complete 4.0
versions
+ switch (entry.getKey())
+ {
+ case INTERNAL_IP:
+ return (null ==
remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT));
+ case STATUS:
+ return (null ==
remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT));
+ case RPC_ADDRESS:
+ return (null ==
remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT));
+ default:
+ return true;
+ }
+ }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+ EndpointState updated = new
EndpointState(remoteState.getHeartBeatState(), updatedStates);
+ if (!remoteState.isAlive()) updated.markDead();
+ return updated;
+ }
+
private void applyNewStates(InetAddressAndPort addr, EndpointState
localState, EndpointState remoteState)
{
// don't assert here, since if the node restarts the version will go
back to zero
@@ -1448,22 +1474,6 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
Set<Entry<ApplicationState, VersionedValue>> updatedStates =
remoteStates.stream().filter(entry -> {
- // Filter out pre-4.0 versions of data for more complete 4.0
versions
- switch (entry.getKey())
- {
- case INTERNAL_IP:
- if
(remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) !=
null) return false;
- break;
- case STATUS:
- if
(remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) != null)
return false;
- break;
- case RPC_ADDRESS:
- if
(remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) !=
null) return false;
- break;
- default:
- break;
- }
-
// filter out the states that are already up to date (has the same
or higher version)
VersionedValue local =
localState.getApplicationState(entry.getKey());
return (local == null || local.version < entry.getValue().version);
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 33ddffd..de7daa1 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -193,6 +193,11 @@ public class RangesAtEndpoint extends
AbstractReplicaCollection<RangesAtEndpoint
public Builder(InetAddressAndPort endpoint, int capacity) {
this(endpoint, new ReplicaList(capacity)); }
private Builder(InetAddressAndPort endpoint, ReplicaList list) {
super(endpoint, list, rangeMap(list)); }
+ public RangesAtEndpoint.Builder add(Replica replica)
+ {
+ return add(replica, Conflict.DUPLICATE);
+ }
+
public RangesAtEndpoint.Builder add(Replica replica, Conflict
ignoreConflict)
{
if (built) throw new IllegalStateException();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]