This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1e508d34 CEP-15: Simplify handling of Insufficient replies from Commit
and Apply
1e508d34 is described below
commit 1e508d340935fef496f58606a14717bed59e8af4
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Fri Oct 13 15:47:32 2023 +0100
CEP-15: Simplify handling of Insufficient replies from Commit and Apply
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-18928
---
.../src/main/java/accord/coordinate/Persist.java | 22 ++---
.../src/main/java/accord/messages/Commit.java | 15 ---
.../src/main/java/accord/messages/Defer.java | 107 ---------------------
3 files changed, 11 insertions(+), 133 deletions(-)
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java
b/accord-core/src/main/java/accord/coordinate/Persist.java
index 0607722f..9a132683 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -28,7 +28,6 @@ import accord.local.Node.Id;
import accord.messages.Apply;
import accord.messages.Apply.ApplyReply;
import accord.messages.Callback;
-import accord.messages.Commit;
import accord.messages.InformDurable;
import accord.primitives.*;
import accord.topology.Topologies;
@@ -37,7 +36,6 @@ import static
accord.coordinate.tracking.RequestStatus.Success;
import static accord.local.Status.Durability.Majority;
import static accord.messages.Apply.executes;
import static accord.messages.Apply.participates;
-import static accord.messages.Commit.Kind.Maximal;
public class Persist implements Callback<ApplyReply>
{
@@ -47,6 +45,8 @@ public class Persist implements Callback<ApplyReply>
final Txn txn;
final Timestamp executeAt;
final Deps deps;
+ final Writes writes;
+ final Result result;
final QuorumTracker tracker;
final Set<Id> persistedOn;
boolean isDone;
@@ -60,7 +60,7 @@ public class Persist implements Callback<ApplyReply>
public static void persist(Node node, Topologies executes, TxnId txnId,
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes,
Result result)
{
Topologies participates = participates(node, route, txnId, executeAt,
executes);
- Persist persist = new Persist(node, executes, txnId, route, txn,
executeAt, deps);
+ Persist persist = new Persist(node, executes, txnId, route, txn,
executeAt, deps, writes, result);
node.send(participates.nodes(), to -> Apply.applyMinimal(to,
participates, executes, txnId, route, txn, executeAt, deps, writes, result),
persist);
}
@@ -68,7 +68,7 @@ public class Persist implements Callback<ApplyReply>
{
Topologies executes = executes(node, route, executeAt);
Topologies participates = participates(node, route, txnId, executeAt,
executes);
- Persist persist = new Persist(node, participates, txnId, route, txn,
executeAt, deps);
+ Persist persist = new Persist(node, participates, txnId, route, txn,
executeAt, deps, writes, result);
node.send(participates.nodes(), to -> Apply.applyMaximal(to,
participates, executes, txnId, route, txn, executeAt, deps, writes, result),
persist);
}
@@ -76,19 +76,21 @@ public class Persist implements Callback<ApplyReply>
{
Topologies executes = executes(node, sendTo, executeAt);
Topologies participates = participates(node, sendTo, txnId, executeAt,
executes);
- Persist persist = new Persist(node, participates, txnId, route, txn,
executeAt, deps);
+ Persist persist = new Persist(node, participates, txnId, route, txn,
executeAt, deps, writes, result);
node.send(participates.nodes(), to -> Apply.applyMaximal(to,
participates, executes, txnId, route, txn, executeAt, deps, writes, result),
persist);
}
- private Persist(Node node, Topologies topologies, TxnId txnId,
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps)
+ private Persist(Node node, Topologies topologies, TxnId txnId,
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes,
Result result)
{
this.node = node;
this.txnId = txnId;
+ this.route = route;
this.txn = txn;
+ this.executeAt = executeAt;
this.deps = deps;
- this.route = route;
+ this.writes = writes;
+ this.result = result;
this.tracker = new QuorumTracker(topologies);
- this.executeAt = executeAt;
this.persistedOn = new HashSet<>();
}
@@ -112,9 +114,7 @@ public class Persist implements Callback<ApplyReply>
}
break;
case Insufficient:
- Topologies topologies = node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch());
- // TODO (easy, cleanup): use static method in Commit
- node.send(from, new Commit(Maximal, from,
topologies.forEpoch(txnId.epoch()), topologies, txnId, txn, route, null,
executeAt, deps, false));
+ Apply.sendMaximal(node, from, txnId, route, txn, executeAt,
deps, writes, result);
}
}
diff --git a/accord-core/src/main/java/accord/messages/Commit.java
b/accord-core/src/main/java/accord/messages/Commit.java
index 9d692e6a..97e67cc8 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -50,9 +50,6 @@ import accord.topology.Topologies;
import accord.topology.Topology;
import accord.utils.Invariants;
-import static accord.local.Status.Committed;
-import static accord.local.Status.Known.DefinitionOnly;
-
public class Commit extends TxnRequest<ReadNack>
{
private static final Logger logger = LoggerFactory.getLogger(Commit.class);
@@ -72,8 +69,6 @@ public class Commit extends TxnRequest<ReadNack>
public final @Nullable FullRoute<?> route;
public final ReadTxnData read;
- private transient Defer defer;
-
public enum Kind { Minimal, Maximal }
// TODO (low priority, clarity): cleanup passing of topologies here -
maybe fetch them afresh from Node?
@@ -177,12 +172,7 @@ public class Commit extends TxnRequest<ReadNack>
case Success:
case Redundant:
return null;
-
case Insufficient:
-
Invariants.checkState(!safeCommand.current().known().isDefinitionKnown());
- if (defer == null)
- defer = new Defer(DefinitionOnly, Committed.minKnown,
Commit.this);
- defer.add(safeStore, safeCommand, safeStore.commandStore());
return ReadNack.NotCommitted;
}
}
@@ -200,11 +190,6 @@ public class Commit extends TxnRequest<ReadNack>
node.reply(replyTo, replyContext, reply, failure);
else if (read != null)
read.process(node, replyTo, replyContext);
- if (defer != null)
- {
- defer.ack();
- defer = null;
- }
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/Defer.java
b/accord-core/src/main/java/accord/messages/Defer.java
deleted file mode 100644
index acb80fe0..00000000
--- a/accord-core/src/main/java/accord/messages/Defer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.messages;
-
-import java.util.function.Function;
-
-import accord.local.*;
-import accord.local.Status.Known;
-import accord.primitives.TxnId;
-import accord.utils.Invariants;
-import org.agrona.collections.IntHashSet;
-
-import static accord.messages.Defer.Ready.Expired;
-import static accord.messages.Defer.Ready.No;
-import static accord.messages.Defer.Ready.Yes;
-
-class Defer implements Command.TransientListener
-{
- public enum Ready { No, Yes, Expired }
-
- final Function<Command, Ready> waitUntil;
- final TxnRequest<?> request;
- final IntHashSet waitingOn = new IntHashSet();
- int waitingOnCount;
- boolean isDone;
-
- Defer(Known waitUntil, Known expireAt, TxnRequest<?> request)
- {
- this(command -> {
- if (!waitUntil.isSatisfiedBy(command.known()))
- return No;
- if (expireAt.isSatisfiedBy(command.known()))
- return Expired;
- return Yes;
- }, request);
- }
-
- Defer(Function<Command, Ready> waitUntil, TxnRequest<?> request)
- {
- this.waitUntil = waitUntil;
- this.request = request;
- }
-
- synchronized void add(SafeCommandStore safeStore, SafeCommand safeCommand,
CommandStore commandStore)
- {
- if (isDone)
- throw new IllegalStateException("Recurrent retry of " + request);
-
- waitingOn.add(commandStore.id());
- ++waitingOnCount;
- safeCommand.addListener(this);
- }
-
- @Override
- public synchronized void onChange(SafeCommandStore safeStore, SafeCommand
safeCommand)
- {
- Command command = safeCommand.current();
- Ready ready = waitUntil.apply(command);
- if (ready == No) return;
-
- if (!safeCommand.removeListener(this))
- return;
-
- if (ready == Expired) return;
-
- int id = safeStore.commandStore().id();
- // TODO (desired): it would be nice at least for transient listener
lists to annotate that they are notifying a listener, to avoid redundant
invocations
- // we can then impose this as an invariant check rather than an
early abort
- Invariants.checkState(waitingOn.contains(id));
- waitingOn.remove(id);
-
- ack();
- }
-
- synchronized void ack()
- {
- if (-1 == --waitingOnCount)
- {
- isDone = true;
- request.process();
- }
- }
-
- @Override
- public PreLoadContext listenerPreLoadContext(TxnId caller)
- {
- Invariants.checkState(caller.equals(request.txnId));
- return request;
- }
-}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]