This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4844e649 (Accord): C* stores table in Range which will cause ranges to
be removed from Accord when DROP TABLE is performed (#125)
4844e649 is described below
commit 4844e64945b720c802dce11d811e25665f9da826
Author: dcapwell <[email protected]>
AuthorDate: Fri Sep 27 21:02:12 2024 -0700
(Accord): C* stores table in Range which will cause ranges to be removed
from Accord when DROP TABLE is performed (#125)
patch by David Capwell, Sam Tunnicliffe; reviewed by Sam Tunnicliffe for
CASSANDRA-18675
---
.../coordinate/AbstractCoordinatePreAccept.java | 5 +-
.../accord/coordinate/CoordinateEphemeralRead.java | 2 +-
.../accord/coordinate/CoordinateSyncPoint.java | 4 +-
.../accord/coordinate/CoordinateTransaction.java | 2 +-
.../java/accord/coordinate/FetchMaxConflict.java | 2 +-
.../java/accord/coordinate/TopologyMismatch.java | 27 ++++++++++
.../src/main/java/accord/local/PreLoadContext.java | 5 ++
.../main/java/accord/local/cfk/CommandsForKey.java | 9 ++++
.../src/main/java/accord/primitives/Routables.java | 4 ++
.../src/main/java/accord/topology/Shard.java | 9 +++-
.../src/main/java/accord/topology/Topology.java | 8 +++
accord-core/src/main/java/accord/utils/Utils.java | 17 +++++-
.../src/test/java/accord/utils/Property.java | 60 ++++++++++++++++------
.../src/test/java/accord/utils/UtilsTest.java | 47 +++++++++++++++++
14 files changed, 177 insertions(+), 24 deletions(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
index e9ea6c54..6232c92a 100644
---
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+++
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
@@ -30,6 +30,7 @@ import accord.local.Node.Id;
import accord.messages.Callback;
import accord.primitives.FullRoute;
import accord.primitives.Seekables;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.Invariants;
@@ -217,7 +218,9 @@ abstract class AbstractCoordinatePreAccept<T, R> extends
SettableResult<T> imple
tryFailure(CoordinationFailed.wrap(withEpochFailure));
return;
}
- TopologyMismatch mismatch =
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(latestEpoch),
txnId, route.homeKey(), keysOrRanges());
+ TopologyMismatch mismatch = txnId.kind() ==
Txn.Kind.ExclusiveSyncPoint
+ ?
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(latestEpoch),
txnId, route.homeKey(), keysOrRanges())
+ :
TopologyMismatch.checkForMismatchOrPendingRemoval(node.topology().globalForEpoch(latestEpoch),
txnId, route.homeKey(), keysOrRanges());
if (mismatch != null)
{
initialRoundIsDone = true;
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
index 60e3ff34..ba69e7f1 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
@@ -58,7 +58,7 @@ public class CoordinateEphemeralRead extends
AbstractCoordinatePreAccept<Result,
{
public static AsyncResult<Result> coordinate(Node node, FullRoute<?>
route, TxnId txnId, Txn txn)
{
- TopologyMismatch mismatch =
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), txn.keys());
+ TopologyMismatch mismatch =
TopologyMismatch.checkForMismatchOrPendingRemoval(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), txn.keys());
if (mismatch != null)
return AsyncResults.failure(mismatch);
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index ee39096c..eafc0367 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -99,7 +99,9 @@ public class CoordinateSyncPoint<S extends Seekables<?, ?>>
extends CoordinatePr
{
checkArgument(txnId.kind() == Kind.SyncPoint || txnId.kind() ==
ExclusiveSyncPoint);
FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
- TopologyMismatch mismatch =
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), keysOrRanges);
+ TopologyMismatch mismatch = txnId.kind() == ExclusiveSyncPoint
+ ?
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), keysOrRanges)
+ :
TopologyMismatch.checkForMismatchOrPendingRemoval(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), keysOrRanges);
if (mismatch != null)
return AsyncResults.failure(mismatch);
CoordinateSyncPoint<S> coordinate = new CoordinateSyncPoint<>(node,
txnId, node.agent().emptySystemTxn(txnId.kind(), keysOrRanges), route, adapter);
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index 0b05657d..7d2f0210 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -59,7 +59,7 @@ public class CoordinateTransaction extends
CoordinatePreAccept<Result>
public static AsyncResult<Result> coordinate(Node node, FullRoute<?>
route, TxnId txnId, Txn txn)
{
- TopologyMismatch mismatch =
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), txn.keys());
+ TopologyMismatch mismatch =
TopologyMismatch.checkForMismatchOrPendingRemoval(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), txn.keys());
if (mismatch != null)
return AsyncResults.failure(mismatch);
CoordinateTransaction coordinate = new CoordinateTransaction(node,
txnId, txn, route);
diff --git a/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
b/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
index 677c40fc..ac3880ba 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
@@ -67,7 +67,7 @@ public class FetchMaxConflict extends
AbstractCoordinatePreAccept<Timestamp, Get
long epoch = node.epoch();
FullRoute<?> route = node.computeRoute(epoch, keysOrRanges);
// TODO (required): need to ensure we permanently fail any bootstrap
that is now impossible and mark as stale
- TopologyMismatch mismatch =
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(epoch), null,
route.homeKey(), keysOrRanges);
+ TopologyMismatch mismatch =
TopologyMismatch.checkForMismatchOrPendingRemoval(node.topology().globalForEpoch(epoch),
null, route.homeKey(), keysOrRanges);
if (mismatch != null)
return AsyncResults.failure(mismatch);
FetchMaxConflict coordinate = new FetchMaxConflict(node, route,
keysOrRanges, epoch);
diff --git a/accord-core/src/main/java/accord/coordinate/TopologyMismatch.java
b/accord-core/src/main/java/accord/coordinate/TopologyMismatch.java
index 1e4748ba..18db4f01 100644
--- a/accord-core/src/main/java/accord/coordinate/TopologyMismatch.java
+++ b/accord-core/src/main/java/accord/coordinate/TopologyMismatch.java
@@ -79,12 +79,39 @@ public class TopologyMismatch extends CoordinationFailed
return String.format("Attempted to access %s that are no longer valid
globally (%d -> %s)", select.without(t.ranges()), t.epoch(), t.ranges());
}
+ private static TopologyMismatch checkForPendingRemoval(Topology t,
@Nullable TxnId txnId, @Nullable RoutingKey homeKey, Routables<?> keysOrRanges)
+ {
+ EnumSet<TopologyMismatch.Reason> reasons = null;
+ if (homeKey != null && !t.reduce(true, s -> s.contains(homeKey),
(result, s) -> result & !s.pendingRemoval))
+ {
+ if (reasons == null)
+ reasons = EnumSet.noneOf(TopologyMismatch.Reason.class);
+ reasons.add(TopologyMismatch.Reason.HOME_KEY);
+ }
+ if (!t.reduce(true, s -> keysOrRanges.intersects(s.range), (result, s)
-> result & !s.pendingRemoval))
+ {
+ if (reasons == null)
+ reasons = EnumSet.noneOf(TopologyMismatch.Reason.class);
+ reasons.add(TopologyMismatch.Reason.KEYS_OR_RANGES);
+ }
+ return reasons == null ? null : new TopologyMismatch(reasons, t,
txnId, homeKey, keysOrRanges);
+ }
+
@Nullable
public static TopologyMismatch checkForMismatch(Topology t, Unseekables<?>
select)
{
return t.ranges().containsAll(select) ? null : new
TopologyMismatch(EnumSet.of(Reason.KEYS_OR_RANGES), t, select);
}
+ @Nullable
+ public static TopologyMismatch checkForMismatchOrPendingRemoval(Topology
t, @Nullable TxnId txnId, RoutingKey homeKey, Routables<?> keysOrRanges)
+ {
+ TopologyMismatch tm = checkForMismatch(t, txnId, homeKey,
keysOrRanges);
+ if (tm == null)
+ tm = checkForPendingRemoval(t, txnId, homeKey, keysOrRanges);
+ return tm;
+ }
+
@Nullable
public static TopologyMismatch checkForMismatch(Topology t, @Nullable
TxnId txnId, RoutingKey homeKey, Routables<?> keysOrRanges)
{
diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java
b/accord-core/src/main/java/accord/local/PreLoadContext.java
index 25c68efd..64014641 100644
--- a/accord-core/src/main/java/accord/local/PreLoadContext.java
+++ b/accord-core/src/main/java/accord/local/PreLoadContext.java
@@ -212,6 +212,11 @@ public interface PreLoadContext
return contextFor(null, Collections.emptyList(), keys);
}
+ static PreLoadContext contextFor(Seekables<?, ?> keys, KeyHistory
keyHistory)
+ {
+ return contextFor(null, Collections.emptyList(), keys, keyHistory);
+ }
+
static PreLoadContext empty()
{
return EMPTY_PRELOADCONTEXT;
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index eb5137b7..91193b7b 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -812,6 +812,15 @@ public class CommandsForKey extends CommandsForKeyUpdate
implements CommandsSumm
return i >= committedByExecuteAt.length ? null :
committedByExecuteAt[i];
}
+ @VisibleForTesting
+ public TxnId nextWaitingToApply()
+ {
+ int i = maxAppliedWriteByExecuteAt + 1;
+ while (i < committedByExecuteAt.length &&
committedByExecuteAt[i].status == APPLIED)
+ ++i;
+ return i >= committedByExecuteAt.length ? null :
committedByExecuteAt[i];
+ }
+
public TxnId blockedOnTxnId(TxnId txnId, @Nullable Timestamp executeAt)
{
TxnInfo minUndecided = minUndecided();
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java
b/accord-core/src/main/java/accord/primitives/Routables.java
index 833c950b..12df7e7f 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -57,6 +57,10 @@ public interface Routables<K extends Routable> extends
Iterable<K>
return StreamSupport.stream(spliterator(), false);
}
boolean intersects(AbstractRanges ranges);
+ default boolean intersects(Range range)
+ {
+ return intersects(Ranges.single(range));
+ }
boolean intersects(AbstractKeys<?> keys);
default boolean intersects(Routables<?> routables)
{
diff --git a/accord-core/src/main/java/accord/topology/Shard.java
b/accord-core/src/main/java/accord/topology/Shard.java
index 684515ee..f95d3d40 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -44,8 +44,9 @@ public class Shard
public final int recoveryFastPathSize;
public final int fastPathQuorumSize;
public final int slowPathQuorumSize;
+ public final boolean pendingRemoval;
- public Shard(Range range, SortedArrayList<Id> nodes, Set<Id>
fastPathElectorate, Set<Id> joining)
+ public Shard(Range range, SortedArrayList<Id> nodes, Set<Id>
fastPathElectorate, Set<Id> joining, boolean pendingRemoval)
{
this.range = range;
this.nodes = nodes;
@@ -57,6 +58,12 @@ public class Shard
this.recoveryFastPathSize = (maxFailures+1)/2;
this.slowPathQuorumSize = slowPathQuorumSize(nodes.size());
this.fastPathQuorumSize = fastPathQuorumSize(nodes.size(), e,
maxFailures);
+ this.pendingRemoval = pendingRemoval;
+ }
+
+ public Shard(Range range, SortedArrayList<Id> nodes, Set<Id>
fastPathElectorate, Set<Id> joining)
+ {
+ this(range, nodes, fastPathElectorate, joining, false);
}
public Shard(Range range, SortedArrayList<Id> nodes, Set<Id>
fastPathElectorate)
diff --git a/accord-core/src/main/java/accord/topology/Topology.java
b/accord-core/src/main/java/accord/topology/Topology.java
index f1fc988f..84ea53f1 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
@@ -553,6 +554,13 @@ public class Topology
forEach.accept(shards[i]);
}
+ public <A> A reduce(A zero,
+ Predicate<Shard> filter,
+ BiFunction<A, ? super Shard, A> reducer)
+ {
+ return Utils.reduce(zero, shards(), filter, reducer);
+ }
+
public SortedArrayList<Id> nodes()
{
return nodeIds;
diff --git a/accord-core/src/main/java/accord/utils/Utils.java
b/accord-core/src/main/java/accord/utils/Utils.java
index 97039109..075c5260 100644
--- a/accord-core/src/main/java/accord/utils/Utils.java
+++ b/accord-core/src/main/java/accord/utils/Utils.java
@@ -30,13 +30,14 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.function.BiFunction;
import java.util.function.IntFunction;
+import java.util.function.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
-// TODO (low priority): remove when jdk8 support is dropped
public class Utils
{
// reimplements Collection#toArray
@@ -150,4 +151,18 @@ public class Utils
array[k] = tmp;
}
}
+
+ public static <A, B> B reduce(B zero,
+ Iterable<A> input,
+ Predicate<A> filter,
+ BiFunction<B, ? super A, B> reducer)
+ {
+ B result = zero;
+ for (A a : input)
+ {
+ if (filter.test(a))
+ result = reducer.apply(result, a);
+ }
+ return result;
+ }
}
diff --git a/accord-core/src/test/java/accord/utils/Property.java
b/accord-core/src/test/java/accord/utils/Property.java
index 724a9383..95ea50bd 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -708,6 +708,8 @@ public class Property
private final Map<Setup<State, SystemUnderTest>, Integer> knownWeights
= new LinkedHashMap<>();
@Nullable
private Set<Setup<State, SystemUnderTest>> unknownWeights = null;
+ @Nullable
+ private Map<Predicate<State>, List<Setup<State, SystemUnderTest>>>
conditionalCommands = null;
private Gen.IntGen unknownWeightGen = Gens.ints().between(1, 10);
@Nullable
private FailingConsumer<State> preCommands = null;
@@ -792,18 +794,15 @@ public class Property
public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, Gen<Command<State, SystemUnderTest, ?>> cmd)
{
- return add((rs, state) -> {
- if (!predicate.test(state)) return ignoreCommand();
- return cmd.next(rs);
- });
+ return addIf(predicate, (rs, state) -> cmd.next(rs));
}
public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, Setup<State, SystemUnderTest> cmd)
{
- return add((rs, state) -> {
- if (!predicate.test(state)) return ignoreCommand();
- return cmd.setup(rs, state);
- });
+ if (conditionalCommands == null)
+ conditionalCommands = new LinkedHashMap<>();
+ conditionalCommands.computeIfAbsent(predicate, i -> new
ArrayList<>()).add(cmd);
+ return this;
}
public CommandsBuilder<State, SystemUnderTest>
addAllIf(Predicate<State> predicate, Consumer<IfBuilder<State,
SystemUnderTest>> sub)
@@ -841,7 +840,7 @@ public class Property
public Commands<State, SystemUnderTest> build()
{
Gen<Setup<State, SystemUnderTest>> commandsGen;
- if (unknownWeights == null)
+ if (unknownWeights == null && conditionalCommands == null)
{
commandsGen = Gens.pick(new LinkedHashMap<>(knownWeights));
}
@@ -849,25 +848,52 @@ public class Property
{
class DynamicWeightsGen implements Gen<Setup<State,
SystemUnderTest>>, Gens.Reset
{
- Gen<Setup<State, SystemUnderTest>> gen;
+ LinkedHashMap<Setup<State, SystemUnderTest>, Integer>
weights;
+ LinkedHashMap<Setup<State, SystemUnderTest>, Integer>
conditionalWeights;
+ Gen<Setup<State, SystemUnderTest>> nonConditional;
@Override
public Setup<State, SystemUnderTest> next(RandomSource rs)
{
- if (gen == null)
+ if (weights == null)
{
// create random weights
- LinkedHashMap<Setup<State, SystemUnderTest>,
Integer> clone = new LinkedHashMap<>(knownWeights);
- for (Setup<State, SystemUnderTest> s :
unknownWeights)
- clone.put(s, unknownWeightGen.nextInt(rs));
- gen = Gens.pick(clone);
+ weights = new LinkedHashMap<>(knownWeights);
+ if (unknownWeights != null)
+ {
+ for (Setup<State, SystemUnderTest> s :
unknownWeights)
+ weights.put(s,
unknownWeightGen.nextInt(rs));
+ }
+ nonConditional = Gens.pick(weights);
+ if (conditionalCommands != null)
+ {
+ conditionalWeights = new LinkedHashMap<>();
+ for (List<Setup<State, SystemUnderTest>>
commands : conditionalCommands.values())
+ {
+ for (Setup<State, SystemUnderTest> c :
commands)
+ conditionalWeights.put(c,
unknownWeightGen.nextInt(rs));
+ }
+ }
}
- return gen.next(rs);
+ if (conditionalWeights == null) return
nonConditional.next(rs);
+ return (r, s) -> {
+ // need to figure out what conditions apply...
+ LinkedHashMap<Setup<State, SystemUnderTest>,
Integer> clone = new LinkedHashMap<>(weights);
+ for (Map.Entry<Predicate<State>, List<Setup<State,
SystemUnderTest>>> e : conditionalCommands.entrySet())
+ {
+ if (e.getKey().test(s))
+ e.getValue().forEach(c -> clone.put(c,
conditionalWeights.get(c)));
+ }
+ Setup<State, SystemUnderTest> select =
Gens.pick(clone).next(r);
+ return select.setup(r, s);
+ };
}
@Override
public void reset()
{
- gen = null;
+ weights = null;
+ nonConditional = null;
+ conditionalWeights = null;
}
}
commandsGen = new DynamicWeightsGen();
diff --git a/accord-core/src/test/java/accord/utils/UtilsTest.java
b/accord-core/src/test/java/accord/utils/UtilsTest.java
new file mode 100644
index 00000000..4f26ce0e
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/UtilsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+import com.google.common.primitives.Ints;
+import org.junit.jupiter.api.Test;
+
+import static accord.utils.Property.qt;
+import static accord.utils.Utils.reduce;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class UtilsTest
+{
+ @Test
+ public void simpleReduce()
+ {
+ qt().forAll(Gens.arrays(Gens.ints().all()).ofSizeBetween(0,
10)).check(array -> {
+ List<Integer> list = Ints.asList(array);
+
+ // filter doesn't return non matches
+ assertThat(reduce(true, list, v -> v % 2 == 0, (acc, v) -> acc &
(v % 2 == 0))).isTrue();
+ // count
+ assertThat(reduce(0, list, i -> true, (acc, v) -> acc +
1)).isEqualTo(array.length);
+ // sum
+ assertThat(reduce(0, list, i -> true, (acc, v) -> acc +
v)).isEqualTo(IntStream.of(array).sum());
+ });
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]