This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81c02769 Command to Exclude Replicas from Durability Status 
Coordination
81c02769 is described below

commit 81c02769f9ad73ef3aba0675c2217fc74b8a4a4c
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Jul 31 00:13:25 2024 -0500

    Command to Exclude Replicas from Durability Status Coordination
    
    patch by Caleb Rackliffe; reviewed by David Capwell and Sam Tunnicliffe for 
CASSANDRA-19321
---
 .../accord/coordinate/CoordinateShardDurable.java  |  4 +-
 .../coordinate/tracking/AbstractTracker.java       |  5 +++
 .../src/main/java/accord/topology/Topologies.java  | 24 +++++++++++-
 .../src/main/java/accord/topology/Topology.java    | 43 +++++++++++++++++-----
 .../test/java/accord/topology/TopologyUtils.java   |  4 +-
 .../java/accord/maelstrom/TopologyFactory.java     |  3 +-
 6 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index 61b2e175..73ba7f2c 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -44,14 +44,14 @@ public class CoordinateShardDurable extends 
ExecuteSyncPoint<Ranges> implements
 
     protected void start()
     {
-        node.send(tracker.nodes(), to -> new WaitUntilApplied(to, 
tracker.topologies(), syncPoint.syncId, syncPoint.keysOrRanges, 
syncPoint.syncId.epoch()), this);
+        node.send(tracker.nonStaleNodes(), to -> new WaitUntilApplied(to, 
tracker.topologies(), syncPoint.syncId, syncPoint.keysOrRanges, 
syncPoint.syncId.epoch()), this);
     }
 
     @Override
     protected void onSuccess()
     {
         node.configService().reportEpochRedundant(syncPoint.keysOrRanges, 
syncPoint.syncId.epoch());
-        node.send(tracker.nodes(), new SetShardDurable(syncPoint));
+        node.send(tracker.nonStaleNodes(), new SetShardDurable(syncPoint));
         super.onSuccess();
     }
 }
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
index 11b311cb..e58f3c67 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
@@ -180,6 +180,11 @@ public abstract class AbstractTracker<ST extends 
ShardTracker>
         return topologies.nodes();
     }
 
+    public Set<Id> nonStaleNodes()
+    {
+        return topologies.nonStaleNodes();
+    }
+
     public ST get(int shardIndex)
     {
         int maxShardsPerEpoch = maxShardsPerEpoch();
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java 
b/accord-core/src/main/java/accord/topology/Topologies.java
index d3b1576e..95fce506 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -18,6 +18,13 @@
 
 package accord.topology;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
 import accord.api.TopologySorter;
 import accord.local.Node;
 import accord.local.Node.Id;
@@ -30,8 +37,6 @@ import accord.utils.SortedArrays;
 import accord.utils.SortedArrays.SortedArrayList;
 import accord.utils.SortedList;
 
-import java.util.*;
-
 import static accord.utils.Invariants.illegalState;
 
 // TODO (desired, efficiency/clarity): since Topologies are rarely needed, 
should optimise API for single topology case
@@ -69,6 +74,8 @@ public interface Topologies extends TopologySorter
     // note this can be expensive to evaluate
     SortedList<Id> nodes();
 
+    Set<Node.Id> nonStaleNodes();
+
     int estimateUniqueNodes();
 
     Ranges computeRangesForNode(Id node);
@@ -209,6 +216,12 @@ public interface Topologies extends TopologySorter
             return topology.nodes();
         }
 
+        @Override
+        public Set<Node.Id> nonStaleNodes()
+        {
+            return topology.nonStaleNodes();
+        }
+
         @Override
         public int estimateUniqueNodes()
         {
@@ -389,6 +402,13 @@ public interface Topologies extends TopologySorter
             return result;
         }
 
+        @Override
+        public Set<Node.Id> nonStaleNodes()
+        {
+            Topology currentTopology = current();
+            return Sets.filter(nodes(), id -> 
!currentTopology.staleIds().contains(id));
+        }
+
         @Override
         public Ranges computeRangesForNode(Id node)
         {
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index aa2ffcca..f1fc988f 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -22,16 +22,21 @@ import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
 
 import accord.api.RoutingKey;
 import accord.local.Node.Id;
@@ -48,8 +53,6 @@ import accord.utils.IndexedTriFunction;
 import accord.utils.SimpleBitSet;
 import accord.utils.SortedArrays.SortedArrayList;
 import accord.utils.Utils;
-import org.agrona.collections.Int2ObjectHashMap;
-import org.agrona.collections.IntArrayList;
 
 import static accord.utils.Invariants.illegalArgument;
 import static accord.utils.SortedArrays.Search.FLOOR;
@@ -59,8 +62,10 @@ public class Topology
 {
     public static final long EMPTY_EPOCH = 0;
     private static final int[] EMPTY_SUBSET = new int[0];
-    public static final Topology EMPTY = new Topology(null, EMPTY_EPOCH, new 
Shard[0], Ranges.EMPTY, new SortedArrayList<>(new Id[0]), new 
Int2ObjectHashMap<>(0, 0.9f), Ranges.EMPTY, EMPTY_SUBSET);
+    public static final Topology EMPTY = new Topology(null, EMPTY_EPOCH, 
Collections.emptySet(), new Shard[0], Ranges.EMPTY, new SortedArrayList<>(new 
Id[0]), new Int2ObjectHashMap<>(0, 0.9f), Ranges.EMPTY, EMPTY_SUBSET);
+
     final long epoch;
+    final Set<Id> staleIds;
     final Shard[] shards;
     final Ranges ranges;
 
@@ -115,10 +120,17 @@ public class Topology
         }
     }
 
+    @VisibleForTesting
     public Topology(long epoch, Shard... shards)
+    {
+        this(epoch, Collections.emptySet(), shards);
+    }
+    
+    public Topology(long epoch, Set<Id> staleIds, Shard... shards)
     {
         this.global = null;
         this.epoch = epoch;
+        this.staleIds = staleIds;
         this.ranges = 
Ranges.ofSortedAndDeoverlapped(Arrays.stream(shards).map(shard -> 
shard.range).toArray(Range[]::new));
         this.shards = shards;
         this.subsetOfRanges = ranges;
@@ -144,10 +156,11 @@ public class Topology
     }
 
     @VisibleForTesting
-    Topology(@Nullable Topology global, long epoch, Shard[] shards, Ranges 
ranges, SortedArrayList<Id> nodeIds, Int2ObjectHashMap<NodeInfo> nodeById, 
Ranges subsetOfRanges, int[] supersetIndexes)
+    Topology(@Nullable Topology global, long epoch, Set<Id> staleIds, Shard[] 
shards, Ranges ranges, SortedArrayList<Id> nodeIds, Int2ObjectHashMap<NodeInfo> 
nodeById, Ranges subsetOfRanges, int[] supersetIndexes)
     {
         this.global = global;
         this.epoch = epoch;
+        this.staleIds = staleIds;
         this.shards = shards;
         this.ranges = ranges;
         this.nodeIds = nodeIds;
@@ -192,12 +205,12 @@ public class Topology
         return result;
     }
 
-    private static Topology select(long epoch, Shard[] shards, int[] indexes)
+    private static Topology select(long epoch, Set<Id> staleIds, Shard[] 
shards, int[] indexes)
     {
         Shard[] subset = new Shard[indexes.length];
         for (int i = 0; i < indexes.length; i++)
             subset[i] = shards[indexes[i]];
-        return new Topology(epoch, subset);
+        return new Topology(epoch, staleIds, subset);
     }
 
     public boolean isSubset()
@@ -217,12 +230,12 @@ public class Topology
             return Topology.EMPTY;
 
         SortedArrayList<Id> nodeIds = new SortedArrayList<>(new Id[] { node });
-        return new Topology(global(), epoch, shards, ranges, nodeIds, 
nodeLookup, info.ranges, info.supersetIndexes);
+        return new Topology(global(), epoch, staleIds, shards, ranges, 
nodeIds, nodeLookup, info.ranges, info.supersetIndexes);
     }
 
     public Topology trim()
     {
-        return select(epoch, shards, this.supersetIndexes);
+        return select(epoch, staleIds, shards, this.supersetIndexes);
     }
 
     public Ranges rangesForNode(Id node)
@@ -281,7 +294,7 @@ public class Topology
         int count = 0;
         for (int i = nodes.firstSetBit() ; i >= 0 ; i = nodes.nextSetBit(i + 
1, -1))
             nodeIds[count++] = this.nodeIds.get(i);
-        return new Topology(global(), epoch, shards, ranges, new 
SortedArrayList<>(nodeIds), nodeLookup, rangeSubset, newSubset);
+        return new Topology(global(), epoch, staleIds, shards, ranges, new 
SortedArrayList<>(nodeIds), nodeLookup, rangeSubset, newSubset);
     }
 
     @VisibleForTesting
@@ -300,7 +313,7 @@ public class Topology
         if (nodeLookup.size() != nodeIds.length)
             nodeIds = Arrays.copyOf(nodeIds, nodeLookup.size());
         Arrays.sort(nodeIds);
-        return new Topology(global(), epoch, shards, ranges, new 
SortedArrayList<>(nodeIds), nodeLookup, rangeSubset, newSubset);
+        return new Topology(global(), epoch, staleIds, shards, ranges, new 
SortedArrayList<>(nodeIds), nodeLookup, rangeSubset, newSubset);
     }
 
     private int[] subsetFor(Unseekables<?> select)
@@ -545,11 +558,21 @@ public class Topology
         return nodeIds;
     }
 
+    public Set<Id> nonStaleNodes()
+    {
+        return Sets.filter(nodes(), id -> !staleIds.contains(id));
+    }
+
     public Ranges ranges()
     {
         return subsetOfRanges;
     }
 
+    public Set<Id> staleIds()
+    {
+        return staleIds;
+    }
+    
     public Shard[] unsafeGetShards()
     {
         return shards;
diff --git a/accord-core/src/test/java/accord/topology/TopologyUtils.java 
b/accord-core/src/test/java/accord/topology/TopologyUtils.java
index ec49f33a..9810bb0f 100644
--- a/accord-core/src/test/java/accord/topology/TopologyUtils.java
+++ b/accord-core/src/test/java/accord/topology/TopologyUtils.java
@@ -57,7 +57,9 @@ public class TopologyUtils
 
     public static Topology withEpoch(Topology topology, long epoch)
     {
-        return new Topology(topology.global == null ? null : 
withEpoch(topology.global, epoch), epoch, topology.shards, topology.ranges, 
topology.nodeIds, topology.nodeLookup, topology.subsetOfRanges, 
topology.supersetIndexes);
+        return new Topology(topology.global == null ? null : 
withEpoch(topology.global, epoch), epoch,
+                            topology.staleIds, topology.shards, 
topology.ranges, topology.nodeIds, topology.nodeLookup,
+                            topology.subsetOfRanges, topology.supersetIndexes);
     }
 
     public static Topology topology(long epoch, List<Node.Id> cluster, Ranges 
ranges, int rf)
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
index 94ae12ab..92f366da 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
@@ -19,6 +19,7 @@
 package accord.maelstrom;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +86,6 @@ public class TopologyFactory
             for (int i = 0 ; i < this.shards ; ++i)
                 shards.add(new Shard(ranges[j][i], 
copyUnsorted(electorates.get(i % electorates.size()), Id[]::new), 
fastPathElectorates.get(i % fastPathElectorates.size())));
         }
-        return new Topology(1, toArray(shards, Shard[]::new));
+        return new Topology(1, Collections.emptySet(), toArray(shards, 
Shard[]::new));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to