This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new d99ad84c CEP-15: (C*) Implement TopologySorter to prioritise hosts
based on DynamicSnitch and/or topology layout (#72)
d99ad84c is described below
commit d99ad84cc49a96299a9ae55183e38ee6f1aa3f47
Author: dcapwell <[email protected]>
AuthorDate: Thu Oct 26 15:44:43 2023 -0700
CEP-15: (C*) Implement TopologySorter to prioritise hosts based on
DynamicSnitch and/or topology layout (#72)
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18929
---
.../src/main/java/accord/topology/Topologies.java | 72 ++++++++++++++--------
.../main/java/accord/topology/TopologyManager.java | 13 ++--
.../java/accord/messages/TxnRequestScopeTest.java | 8 +--
3 files changed, 56 insertions(+), 37 deletions(-)
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java
b/accord-core/src/main/java/accord/topology/Topologies.java
index 5832cce0..1d729d29 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -260,29 +260,25 @@ public interface Topologies extends TopologySorter
private final List<Topology> topologies;
private final int maxShardsPerEpoch;
- public Multi(TopologySorter.Supplier sorter, int initialCapacity)
- {
- this.topologies = new ArrayList<>(initialCapacity);
- this.supplier = sorter;
- this.sorter = sorter.get(this);
- int maxShardsPerEpoch = 0;
- for (int i = 0 ; i < topologies.size() ; ++i)
- maxShardsPerEpoch = Math.max(maxShardsPerEpoch,
topologies.get(i).size());
- this.maxShardsPerEpoch = maxShardsPerEpoch;
- }
-
public Multi(TopologySorter.Supplier sorter, Topology... topologies)
{
- this(sorter, topologies.length);
- for (Topology topology : topologies)
- add(topology);
+ this(sorter, Arrays.asList(topologies));
}
- public Multi(TopologySorter.Supplier sorter, List<Topology> topologies)
+ public Multi(TopologySorter.Supplier sorter, List<Topology> input)
{
- this(sorter, topologies.size());
- for (Topology topology : topologies)
- add(topology);
+ this.topologies = new ArrayList<>(input.size());
+ for (Topology topology : input)
+ {
+ Invariants.checkArgument(topologies.isEmpty() ||
topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
+ topologies.add(topology);
+ }
+ int maxShardsPerEpoch = 0;
+ for (int i = 0 ; i < topologies.size() ; ++i)
+ maxShardsPerEpoch = Math.max(maxShardsPerEpoch,
topologies.get(i).size());
+ this.maxShardsPerEpoch = maxShardsPerEpoch;
+ this.supplier = sorter;
+ this.sorter = sorter.get(this);
}
@Override
@@ -392,12 +388,6 @@ public interface Topologies extends TopologySorter
return maxShardsPerEpoch;
}
- public void add(Topology topology)
- {
- Invariants.checkArgument(topologies.isEmpty() || topology.epoch ==
topologies.get(topologies.size() - 1).epoch - 1);
- topologies.add(topology);
- }
-
@Override
public boolean equals(Object obj)
{
@@ -422,4 +412,38 @@ public interface Topologies extends TopologySorter
return sorter.compare(node1, node2, shards);
}
}
+
+ class Builder
+ {
+ private final List<Topology> topologies;
+
+ public Builder(int initialCapacity)
+ {
+ topologies = new ArrayList<>(initialCapacity);
+ }
+
+ public void add(Topology topology)
+ {
+ Invariants.checkArgument(topologies.isEmpty() || topology.epoch ==
topologies.get(topologies.size() - 1).epoch - 1);
+ topologies.add(topology);
+ }
+
+ public boolean isEmpty()
+ {
+ return topologies.isEmpty();
+ }
+
+ public Topologies build(TopologySorter.Supplier sorter)
+ {
+ switch (topologies.size())
+ {
+ case 0:
+ throw new IllegalStateException("Unable to build an empty
Topologies");
+ case 1:
+ return new Single(sorter, topologies.get(0));
+ default:
+ return new Multi(sorter, topologies);
+ }
+ }
+ }
}
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index a940b109..dcb499c4 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -517,7 +517,7 @@ public class TopologyManager
int i = (int)(snapshot.currentEpoch - maxEpoch);
int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch,
snapshot.epochs.length));
- Topologies.Multi topologies = new Topologies.Multi(sorter, maxi - i);
+ Topologies.Builder topologies = new Topologies.Builder(maxi - i);
Unseekables<?> remaining = select;
while (i < maxi)
@@ -528,7 +528,7 @@ public class TopologyManager
}
if (i == snapshot.epochs.length)
- return topologies;
+ return topologies.build(sorter);
// include any additional epochs to reach sufficiency
EpochState prev = snapshot.epochs[maxi - 1];
@@ -538,14 +538,14 @@ public class TopologyManager
remaining = remaining.subtract(sufficient);
remaining = remaining.subtract(prev.addedRanges);
if (remaining.isEmpty())
- return topologies;
+ return topologies.build(sorter);
EpochState next = snapshot.epochs[i++];
topologies.add(next.global.forSelection(remaining));
prev = next;
} while (i < snapshot.epochs.length);
- return topologies;
+ return topologies.build(sorter);
}
public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long
maxEpoch)
@@ -556,7 +556,7 @@ public class TopologyManager
return new Single(sorter,
snapshot.get(minEpoch).global.forSelection(select));
int count = (int)(1 + maxEpoch - minEpoch);
- Topologies.Multi topologies = new Topologies.Multi(sorter, count);
+ Topologies.Builder topologies = new Topologies.Builder(count);
for (int i = count - 1 ; i >= 0 ; --i)
{
EpochState epochState = snapshot.get(minEpoch + i);
@@ -564,10 +564,9 @@ public class TopologyManager
select = select.subtract(epochState.addedRanges);
}
- for (int i = count - 1 ; i >= 0 ; --i)
Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch
that contained %s", select);
- return topologies;
+ return topologies.build(sorter);
}
public Topologies forEpoch(Unseekables<?> select, long epoch)
diff --git a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
index e32097ec..95021c13 100644
--- a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
+++ b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
@@ -44,9 +44,7 @@ public class TxnRequestScopeTest
Topology topology1 = topology(1, shard(range, idList(1, 2, 3),
idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(3, 4, 5),
idSet(4, 5)));
- Topologies.Multi topologies = new
Topologies.Multi((TopologySorter.StaticSorter)(a, b, s)->0);
- topologies.add(topology2);
- topologies.add(topology1);
+ Topologies.Multi topologies = new
Topologies.Multi((TopologySorter.StaticSorter)(a, b, s)->0, topology2,
topology1);
// 3 remains a member across both topologies, so can process requests
without waiting for latest topology data
Assertions.assertEquals(scope(150),
((PartialKeyRoute)TxnRequest.computeScope(id(3), topologies,
route)).toParticipants());
@@ -75,9 +73,7 @@ public class TxnRequestScopeTest
shard(range1, idList(4, 5, 6), idSet(4,
5)),
shard(range2, idList(1, 2, 3), idSet(1,
2)) );
- Topologies.Multi topologies = new
Topologies.Multi((TopologySorter.StaticSorter)(a,b,s)->0);
- topologies.add(topology2);
- topologies.add(topology1);
+ Topologies.Multi topologies = new
Topologies.Multi((TopologySorter.StaticSorter)(a,b,s)->0, topology2, topology1);
Assertions.assertEquals(scope(150, 250),
((PartialKeyRoute)TxnRequest.computeScope(id(1), topologies,
route)).toParticipants());
Assertions.assertEquals(2, TxnRequest.computeWaitForEpoch(id(1),
topologies, route));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]