This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81c02769 Command to Exclude Replicas from Durability Status
Coordination
81c02769 is described below
commit 81c02769f9ad73ef3aba0675c2217fc74b8a4a4c
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Jul 31 00:13:25 2024 -0500
Command to Exclude Replicas from Durability Status Coordination
patch by Caleb Rackliffe; reviewed by David Capwell and Sam Tunnicliffe for
CASSANDRA-19321
---
.../accord/coordinate/CoordinateShardDurable.java | 4 +-
.../coordinate/tracking/AbstractTracker.java | 5 +++
.../src/main/java/accord/topology/Topologies.java | 24 +++++++++++-
.../src/main/java/accord/topology/Topology.java | 43 +++++++++++++++++-----
.../test/java/accord/topology/TopologyUtils.java | 4 +-
.../java/accord/maelstrom/TopologyFactory.java | 3 +-
6 files changed, 67 insertions(+), 16 deletions(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index 61b2e175..73ba7f2c 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -44,14 +44,14 @@ public class CoordinateShardDurable extends
ExecuteSyncPoint<Ranges> implements
protected void start()
{
- node.send(tracker.nodes(), to -> new WaitUntilApplied(to,
tracker.topologies(), syncPoint.syncId, syncPoint.keysOrRanges,
syncPoint.syncId.epoch()), this);
+ node.send(tracker.nonStaleNodes(), to -> new WaitUntilApplied(to,
tracker.topologies(), syncPoint.syncId, syncPoint.keysOrRanges,
syncPoint.syncId.epoch()), this);
}
@Override
protected void onSuccess()
{
node.configService().reportEpochRedundant(syncPoint.keysOrRanges,
syncPoint.syncId.epoch());
- node.send(tracker.nodes(), new SetShardDurable(syncPoint));
+ node.send(tracker.nonStaleNodes(), new SetShardDurable(syncPoint));
super.onSuccess();
}
}
diff --git
a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
index 11b311cb..e58f3c67 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
@@ -180,6 +180,11 @@ public abstract class AbstractTracker<ST extends
ShardTracker>
return topologies.nodes();
}
+ public Set<Id> nonStaleNodes()
+ {
+ return topologies.nonStaleNodes();
+ }
+
public ST get(int shardIndex)
{
int maxShardsPerEpoch = maxShardsPerEpoch();
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java
b/accord-core/src/main/java/accord/topology/Topologies.java
index d3b1576e..95fce506 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -18,6 +18,13 @@
package accord.topology;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
import accord.api.TopologySorter;
import accord.local.Node;
import accord.local.Node.Id;
@@ -30,8 +37,6 @@ import accord.utils.SortedArrays;
import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.SortedList;
-import java.util.*;
-
import static accord.utils.Invariants.illegalState;
// TODO (desired, efficiency/clarity): since Topologies are rarely needed,
should optimise API for single topology case
@@ -69,6 +74,8 @@ public interface Topologies extends TopologySorter
// note this can be expensive to evaluate
SortedList<Id> nodes();
+ Set<Node.Id> nonStaleNodes();
+
int estimateUniqueNodes();
Ranges computeRangesForNode(Id node);
@@ -209,6 +216,12 @@ public interface Topologies extends TopologySorter
return topology.nodes();
}
+ @Override
+ public Set<Node.Id> nonStaleNodes()
+ {
+ return topology.nonStaleNodes();
+ }
+
@Override
public int estimateUniqueNodes()
{
@@ -389,6 +402,13 @@ public interface Topologies extends TopologySorter
return result;
}
+ @Override
+ public Set<Node.Id> nonStaleNodes()
+ {
+ Topology currentTopology = current();
+ return Sets.filter(nodes(), id ->
!currentTopology.staleIds().contains(id));
+ }
+
@Override
public Ranges computeRangesForNode(Id node)
{
diff --git a/accord-core/src/main/java/accord/topology/Topology.java
b/accord-core/src/main/java/accord/topology/Topology.java
index aa2ffcca..f1fc988f 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -22,16 +22,21 @@ import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
import accord.api.RoutingKey;
import accord.local.Node.Id;
@@ -48,8 +53,6 @@ import accord.utils.IndexedTriFunction;
import accord.utils.SimpleBitSet;
import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.Utils;
-import org.agrona.collections.Int2ObjectHashMap;
-import org.agrona.collections.IntArrayList;
import static accord.utils.Invariants.illegalArgument;
import static accord.utils.SortedArrays.Search.FLOOR;
@@ -59,8 +62,10 @@ public class Topology
{
public static final long EMPTY_EPOCH = 0;
private static final int[] EMPTY_SUBSET = new int[0];
- public static final Topology EMPTY = new Topology(null, EMPTY_EPOCH, new
Shard[0], Ranges.EMPTY, new SortedArrayList<>(new Id[0]), new
Int2ObjectHashMap<>(0, 0.9f), Ranges.EMPTY, EMPTY_SUBSET);
+ public static final Topology EMPTY = new Topology(null, EMPTY_EPOCH,
Collections.emptySet(), new Shard[0], Ranges.EMPTY, new SortedArrayList<>(new
Id[0]), new Int2ObjectHashMap<>(0, 0.9f), Ranges.EMPTY, EMPTY_SUBSET);
+
final long epoch;
+ final Set<Id> staleIds;
final Shard[] shards;
final Ranges ranges;
@@ -115,10 +120,17 @@ public class Topology
}
}
+ @VisibleForTesting
public Topology(long epoch, Shard... shards)
+ {
+ this(epoch, Collections.emptySet(), shards);
+ }
+
+ public Topology(long epoch, Set<Id> staleIds, Shard... shards)
{
this.global = null;
this.epoch = epoch;
+ this.staleIds = staleIds;
this.ranges =
Ranges.ofSortedAndDeoverlapped(Arrays.stream(shards).map(shard ->
shard.range).toArray(Range[]::new));
this.shards = shards;
this.subsetOfRanges = ranges;
@@ -144,10 +156,11 @@ public class Topology
}
@VisibleForTesting
- Topology(@Nullable Topology global, long epoch, Shard[] shards, Ranges
ranges, SortedArrayList<Id> nodeIds, Int2ObjectHashMap<NodeInfo> nodeById,
Ranges subsetOfRanges, int[] supersetIndexes)
+ Topology(@Nullable Topology global, long epoch, Set<Id> staleIds, Shard[]
shards, Ranges ranges, SortedArrayList<Id> nodeIds, Int2ObjectHashMap<NodeInfo>
nodeById, Ranges subsetOfRanges, int[] supersetIndexes)
{
this.global = global;
this.epoch = epoch;
+ this.staleIds = staleIds;
this.shards = shards;
this.ranges = ranges;
this.nodeIds = nodeIds;
@@ -192,12 +205,12 @@ public class Topology
return result;
}
- private static Topology select(long epoch, Shard[] shards, int[] indexes)
+ private static Topology select(long epoch, Set<Id> staleIds, Shard[]
shards, int[] indexes)
{
Shard[] subset = new Shard[indexes.length];
for (int i = 0; i < indexes.length; i++)
subset[i] = shards[indexes[i]];
- return new Topology(epoch, subset);
+ return new Topology(epoch, staleIds, subset);
}
public boolean isSubset()
@@ -217,12 +230,12 @@ public class Topology
return Topology.EMPTY;
SortedArrayList<Id> nodeIds = new SortedArrayList<>(new Id[] { node });
- return new Topology(global(), epoch, shards, ranges, nodeIds,
nodeLookup, info.ranges, info.supersetIndexes);
+ return new Topology(global(), epoch, staleIds, shards, ranges,
nodeIds, nodeLookup, info.ranges, info.supersetIndexes);
}
public Topology trim()
{
- return select(epoch, shards, this.supersetIndexes);
+ return select(epoch, staleIds, shards, this.supersetIndexes);
}
public Ranges rangesForNode(Id node)
@@ -281,7 +294,7 @@ public class Topology
int count = 0;
for (int i = nodes.firstSetBit() ; i >= 0 ; i = nodes.nextSetBit(i +
1, -1))
nodeIds[count++] = this.nodeIds.get(i);
- return new Topology(global(), epoch, shards, ranges, new
SortedArrayList<>(nodeIds), nodeLookup, rangeSubset, newSubset);
+ return new Topology(global(), epoch, staleIds, shards, ranges, new
SortedArrayList<>(nodeIds), nodeLookup, rangeSubset, newSubset);
}
@VisibleForTesting
@@ -300,7 +313,7 @@ public class Topology
if (nodeLookup.size() != nodeIds.length)
nodeIds = Arrays.copyOf(nodeIds, nodeLookup.size());
Arrays.sort(nodeIds);
- return new Topology(global(), epoch, shards, ranges, new
SortedArrayList<>(nodeIds), nodeLookup, rangeSubset, newSubset);
+ return new Topology(global(), epoch, staleIds, shards, ranges, new
SortedArrayList<>(nodeIds), nodeLookup, rangeSubset, newSubset);
}
private int[] subsetFor(Unseekables<?> select)
@@ -545,11 +558,21 @@ public class Topology
return nodeIds;
}
+ public Set<Id> nonStaleNodes()
+ {
+ return Sets.filter(nodes(), id -> !staleIds.contains(id));
+ }
+
public Ranges ranges()
{
return subsetOfRanges;
}
+ public Set<Id> staleIds()
+ {
+ return staleIds;
+ }
+
public Shard[] unsafeGetShards()
{
return shards;
diff --git a/accord-core/src/test/java/accord/topology/TopologyUtils.java
b/accord-core/src/test/java/accord/topology/TopologyUtils.java
index ec49f33a..9810bb0f 100644
--- a/accord-core/src/test/java/accord/topology/TopologyUtils.java
+++ b/accord-core/src/test/java/accord/topology/TopologyUtils.java
@@ -57,7 +57,9 @@ public class TopologyUtils
public static Topology withEpoch(Topology topology, long epoch)
{
- return new Topology(topology.global == null ? null :
withEpoch(topology.global, epoch), epoch, topology.shards, topology.ranges,
topology.nodeIds, topology.nodeLookup, topology.subsetOfRanges,
topology.supersetIndexes);
+ return new Topology(topology.global == null ? null :
withEpoch(topology.global, epoch), epoch,
+ topology.staleIds, topology.shards,
topology.ranges, topology.nodeIds, topology.nodeLookup,
+ topology.subsetOfRanges, topology.supersetIndexes);
}
public static Topology topology(long epoch, List<Node.Id> cluster, Ranges
ranges, int rf)
diff --git
a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
index 94ae12ab..92f366da 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
@@ -19,6 +19,7 @@
package accord.maelstrom;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -85,6 +86,6 @@ public class TopologyFactory
for (int i = 0 ; i < this.shards ; ++i)
shards.add(new Shard(ranges[j][i],
copyUnsorted(electorates.get(i % electorates.size()), Id[]::new),
fastPathElectorates.get(i % fastPathElectorates.size())));
}
- return new Topology(1, toArray(shards, Shard[]::new));
+ return new Topology(1, Collections.emptySet(), toArray(shards,
Shard[]::new));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]