This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new d63d06aa Move preaccept expiration logic away from Agent d63d06aa is described below commit d63d06aafe2e60e57a9651ff6dd491175bbe6916 Author: Aleksey Yeschenko <alek...@apache.org> AuthorDate: Fri May 17 13:33:57 2024 +0100 Move preaccept expiration logic away from Agent patch by Aleksey Yeschenko; reviewed by Alex Petrov, Benedict Elliott Smith, and David Capwell for CASSANDRA-18888 --- accord-core/src/main/java/accord/api/Agent.java | 6 +++-- .../accord/coordinate/CoordinateTransaction.java | 2 +- .../src/main/java/accord/local/CommandStore.java | 7 +++++- .../java/accord/messages/ExecutionContext.java | 29 ++++++++++++++++++++++ .../src/test/java/accord/impl/TestAgent.java | 9 ++++--- .../src/test/java/accord/impl/list/ListAgent.java | 5 ++-- .../main/java/accord/maelstrom/MaelstromAgent.java | 10 ++++---- 7 files changed, 52 insertions(+), 16 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Agent.java b/accord-core/src/main/java/accord/api/Agent.java index f229ab82..06ed2f98 100644 --- a/accord-core/src/main/java/accord/api/Agent.java +++ b/accord-core/src/main/java/accord/api/Agent.java @@ -26,7 +26,6 @@ import accord.primitives.Ranges; import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.Txn; -import accord.primitives.TxnId; /** * Facility for augmenting node behaviour at specific points @@ -70,7 +69,10 @@ public interface Agent extends UncaughtExceptionListener void onHandledException(Throwable t); - boolean isExpired(TxnId initiated, long now); + /** + * @return PreAccept timeout with implementation-defined resolution of the hybrid logical clock + */ + long preAcceptTimeout(); Txn emptyTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges); diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java index dc4395c9..0b05657d 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java @@ -84,7 +84,7 @@ public class CoordinateTransaction extends CoordinatePreAccept<Result> // but by sending accept we rule out hybrid fast-path // TODO (low priority, efficiency): if we receive an expired response, perhaps defer to permit at least one other // node to respond before invalidating - if (executeAt.isRejected() || node.agent().isExpired(txnId, executeAt.hlc())) + if (executeAt.isRejected() || executeAt.hlc() - txnId.hlc() >= node.agent().preAcceptTimeout()) { proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt,this); } diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 8e2f18eb..1d5a3913 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -319,8 +319,13 @@ public abstract class CommandStore implements AgentExecutor */ final Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys, SafeCommandStore safeStore, boolean permitFastPath) { + // TODO (expected): make preAcceptTimeout() be a part of SafeCommandStore, initiated from ExecutionContext; + // preAcceptTimeout can be subject to local configuration changes, which would break determinism of repeated + // message processing, if, say, replayed from a log. + NodeTimeService time = safeStore.time(); - boolean isExpired = agent().isExpired(txnId, safeStore.time().now()); + + boolean isExpired = time.now() - txnId.hlc() >= agent().preAcceptTimeout() && !txnId.kind().isSyncPoint(); if (rejectBefore != null && !isExpired) isExpired = null == rejectBefore.foldl(keys, (rejectIfBefore, test) -> rejectIfBefore.compareTo(test) > 0 ? null : test, txnId, Objects::isNull); diff --git a/accord-core/src/main/java/accord/messages/ExecutionContext.java b/accord-core/src/main/java/accord/messages/ExecutionContext.java new file mode 100644 index 00000000..dbf4c2db --- /dev/null +++ b/accord-core/src/main/java/accord/messages/ExecutionContext.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package accord.messages; + +/** + * Necessary context to allow for deterministic repeated execution of requests (e.g. when re-applying from a log) + */ +public interface ExecutionContext +{ + /** + * @return PreAccept timeout as it was at request execution + */ + long preAcceptTimeout(); +} diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java b/accord-core/src/test/java/accord/impl/TestAgent.java index 0624c4ea..8e5cad4c 100644 --- a/accord-core/src/test/java/accord/impl/TestAgent.java +++ b/accord-core/src/test/java/accord/impl/TestAgent.java @@ -20,7 +20,6 @@ package accord.impl; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; @@ -37,7 +36,9 @@ import accord.primitives.Ranges; import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.Txn; -import accord.primitives.TxnId; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; public class TestAgent implements Agent { @@ -113,9 +114,9 @@ public class TestAgent implements Agent } @Override - public boolean isExpired(TxnId initiated, long now) + public long preAcceptTimeout() { - return TimeUnit.SECONDS.convert(now - initiated.hlc(), TimeUnit.MICROSECONDS) >= 10; + return MICROSECONDS.convert(10, SECONDS); } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index 092fb744..31cb5155 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -31,7 +31,6 @@ import accord.primitives.Ranges; import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.Txn; -import accord.primitives.TxnId; import static accord.local.Node.Id.NONE; import static accord.utils.Invariants.checkState; @@ -98,9 +97,9 @@ public class ListAgent implements Agent } @Override - public boolean isExpired(TxnId initiated, long now) + public long preAcceptTimeout() { - return now - initiated.hlc() >= timeout && !initiated.kind().isSyncPoint(); + return timeout; } @Override diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java index 66e1aebf..28d55d9a 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java @@ -18,8 +18,6 @@ package accord.maelstrom; -import java.util.concurrent.TimeUnit; - import accord.api.Agent; import accord.api.Result; import accord.local.Command; @@ -29,7 +27,9 @@ import accord.primitives.Ranges; import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.Txn; -import accord.primitives.TxnId; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static accord.utils.Invariants.checkState; @@ -80,9 +80,9 @@ public class MaelstromAgent implements Agent } @Override - public boolean isExpired(TxnId initiated, long now) + public long preAcceptTimeout() { - return TimeUnit.SECONDS.convert(now - initiated.hlc(), TimeUnit.MICROSECONDS) >= 10; + return MICROSECONDS.convert(10, SECONDS); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org