This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64f54f9  Avoid processing redundant application states on endpoint 
changes
64f54f9 is described below

commit 64f54f9fb0ac1fe2920f44379326cf076ab8aab8
Author: Mick Semb Wever <[email protected]>
AuthorDate: Fri Feb 26 16:14:08 2021 +0100

    Avoid processing redundant application states on endpoint changes
    
    Also default `RangesAtEndpoint..add(...)` to `Conflict.DUPLICATE` to filter 
out exact duplicates.
    
     patch by Adam Holmberg, Mick Semb Wever; reviewed by Berenguer Blasi, 
Brandon Williams, Adam Holmberg, Mick Semb Wever for CASSANDRA-16381
    
    Co-authored-by: Adam Holmberg <[email protected]>
---
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    | 44 +++++++++++++---------
 .../apache/cassandra/locator/RangesAtEndpoint.java |  5 +++
 3 files changed, 33 insertions(+), 17 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 95028a6..7b00c4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Avoid processing redundant application states on endpoint changes 
(CASSANDRA-16381)
  * Prevent parent repair sessions leak (CASSANDRA-16446)
  * Fix timestamp issue in SinglePartitionSliceCommandTest 
testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
  * Promote protocol V5 out of beta (CASSANDRA-14973)
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 7acaec9..7720379 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1376,7 +1376,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             }
 
             EndpointState localEpStatePtr = endpointStateMap.get(ep);
-            EndpointState remoteState = entry.getValue();
+            EndpointState remoteState = 
removeRedundantApplicationStates(entry.getValue());
 
             /*
                 If state does not exist just add it. If it does then add it if 
the remote generation is greater.
@@ -1434,6 +1434,32 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
+    // remove duplicated deprecated states
+    private static EndpointState 
removeRedundantApplicationStates(EndpointState remoteState)
+    {
+        if (remoteState.states().isEmpty())
+            return remoteState;
+
+        Map<ApplicationState, VersionedValue> updatedStates = 
remoteState.states().stream().filter(entry -> {
+            // Filter out pre-4.0 versions of data for more complete 4.0 
versions
+            switch (entry.getKey())
+            {
+                case INTERNAL_IP:
+                    return (null == 
remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT));
+                case STATUS:
+                    return (null == 
remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT));
+                case RPC_ADDRESS:
+                    return (null == 
remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT));
+                default:
+                    return true;
+            }
+        }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+        EndpointState updated = new 
EndpointState(remoteState.getHeartBeatState(), updatedStates);
+        if (!remoteState.isAlive()) updated.markDead();
+        return updated;
+    }
+
     private void applyNewStates(InetAddressAndPort addr, EndpointState 
localState, EndpointState remoteState)
     {
         // don't assert here, since if the node restarts the version will go 
back to zero
@@ -1448,22 +1474,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
 
         Set<Entry<ApplicationState, VersionedValue>> updatedStates = 
remoteStates.stream().filter(entry -> {
-            // Filter out pre-4.0 versions of data for more complete 4.0 
versions
-            switch (entry.getKey())
-            {
-                case INTERNAL_IP:
-                    if 
(remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) != 
null) return false;
-                    break;
-                case STATUS:
-                    if 
(remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) != null) 
return false;
-                    break;
-                case RPC_ADDRESS:
-                    if 
(remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) != 
null) return false;
-                    break;
-                default:
-                    break;
-            }
-
             // filter out the states that are already up to date (has the same 
or higher version)
             VersionedValue local = 
localState.getApplicationState(entry.getKey());
             return (local == null || local.version < entry.getValue().version);
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java 
b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 33ddffd..de7daa1 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -193,6 +193,11 @@ public class RangesAtEndpoint extends 
AbstractReplicaCollection<RangesAtEndpoint
         public Builder(InetAddressAndPort endpoint, int capacity) { 
this(endpoint, new ReplicaList(capacity)); }
         private Builder(InetAddressAndPort endpoint, ReplicaList list) { 
super(endpoint, list, rangeMap(list)); }
 
+        public RangesAtEndpoint.Builder add(Replica replica)
+        {
+            return add(replica, Conflict.DUPLICATE);
+        }
+
         public RangesAtEndpoint.Builder add(Replica replica, Conflict 
ignoreConflict)
         {
             if (built) throw new IllegalStateException();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to