This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch CASSANDRA-20112 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 93dce55deab8f1546e3b17882086183c3e4c4cd3 Author: Alex Petrov <[email protected]> AuthorDate: Wed Nov 27 15:36:15 2024 +0100 Reuse Loader code between --- accord-core/src/main/java/accord/api/Journal.java | 2 +- .../src/main/java/accord/impl/AbstractLoader.java | 77 ++++++++++++++++++++++ .../java/accord/impl/InMemoryCommandStore.java | 77 ++++++---------------- 3 files changed, 98 insertions(+), 58 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index 1747c97f..8c524893 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -85,7 +85,7 @@ public interface Journal } /** - * + * Helper for CommandStore to restore Command states. */ interface Loader { diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java b/accord-core/src/main/java/accord/impl/AbstractLoader.java new file mode 100644 index 00000000..1a43f6e3 --- /dev/null +++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.function.BiConsumer; + +import accord.api.Journal; +import accord.local.Cleanup; +import accord.local.Command; +import accord.local.Commands; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; +import accord.primitives.TxnId; + +import static accord.primitives.SaveStatus.Applying; +import static accord.primitives.Status.Invalidated; +import static accord.primitives.Status.PreApplied; +import static accord.primitives.Status.Stable; +import static accord.primitives.Status.Truncated; + +public abstract class AbstractLoader implements Journal.Loader +{ + protected Command loadInternal(Command command, SafeCommandStore safeStore) + { + TxnId txnId = command.txnId(); + if (command.status() != Truncated && command.status() != Invalidated) + { + Cleanup cleanup = Cleanup.shouldCleanup(safeStore, command, command.participants()); + switch (cleanup) + { + case NO: + break; + case INVALIDATE: + case TRUNCATE_WITH_OUTCOME: + case TRUNCATE: + case ERASE: + command = Commands.purge(command, command.participants(), cleanup); + } + } + + command = safeStore.unsafeGet(txnId).update(safeStore, command); + if (command.status() == Truncated) + safeStore.progressLog().clear(command.txnId()); + return command; + } + + protected void applyWrites(Command command, SafeCommandStore safeStore, BiConsumer<SafeCommand, Command> apply) + { + TxnId txnId = command.txnId(); + SafeCommand safeCommand = safeStore.unsafeGet(txnId); + Command local = safeCommand.current(); + if (local.is(Stable) || local.is(PreApplied)) + { + Commands.maybeExecute(safeStore, safeCommand, local, true, true); + } + else if (local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated)) + { + apply.accept(safeCommand, local); + } + } +} diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index aacf1fa0..dde87a8e 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -43,19 +43,16 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; - -import accord.api.Journal; -import accord.api.LocalListeners; -import accord.api.RoutingKey; -import accord.impl.progresslog.DefaultProgressLog; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Agent; import accord.api.DataStore; +import accord.api.Journal; +import accord.api.LocalListeners; import accord.api.ProgressLog; -import accord.local.Cleanup; +import accord.api.RoutingKey; +import accord.impl.progresslog.DefaultProgressLog; import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores.RangesForEpoch; @@ -98,19 +95,17 @@ import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; import static accord.local.SafeCommandStore.TestDep.WITH_OR_INVALIDATED; import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE; import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS; +import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.SaveStatus.Applying; import static accord.primitives.SaveStatus.Erased; import static accord.primitives.SaveStatus.ErasedOrVestigial; import static accord.primitives.SaveStatus.ReadyToExecute; import static accord.primitives.Status.Applied; import static accord.primitives.Status.Durability.Local; -import static accord.primitives.Status.Invalidated; -import static accord.primitives.Status.PreApplied; +import static accord.primitives.Status.NotDefined; import static accord.primitives.Status.PreCommitted; import static accord.primitives.Status.Stable; import static accord.primitives.Status.Truncated; -import static accord.primitives.Status.NotDefined; -import static accord.primitives.Routables.Slice.Minimal; import static accord.utils.Invariants.illegalState; import static java.lang.String.format; @@ -1385,7 +1380,7 @@ public abstract class InMemoryCommandStore extends CommandStore return loader; } - private static class CommandLoader implements Journal.Loader + private static class CommandLoader extends AbstractLoader { private final InMemoryCommandStore commandStore; @@ -1415,34 +1410,11 @@ public abstract class InMemoryCommandStore extends CommandStore @Override public void load(Command command, Journal.OnDone onDone) { - TxnId txnId = command.txnId(); - try { commandStore.executeInContext(commandStore, context(command, ASYNC), - safeStore -> { - Command local = command; - if (local.status() != Truncated && local.status() != Invalidated) - { - Cleanup cleanup = Cleanup.shouldCleanup(safeStore, local, local.participants()); - switch (cleanup) - { - case NO: - break; - case INVALIDATE: - case TRUNCATE_WITH_OUTCOME: - case TRUNCATE: - case ERASE: - local = Commands.purge(local, local.participants(), cleanup); - } - } - - local = safeStore.unsafeGet(txnId).update(safeStore, local); - if (local.status() == Truncated) - safeStore.progressLog().clear(local.txnId()); - return local; - }); + safeStore -> loadInternal(command, safeStore)); onDone.success(); } catch (Throwable t) @@ -1454,24 +1426,15 @@ public abstract class InMemoryCommandStore extends CommandStore @Override public void apply(Command command, Journal.OnDone onDone) { - TxnId txnId = command.txnId(); - try { PreLoadContext context = context(command, KeyHistory.TIMESTAMPS); commandStore.executeInContext(commandStore, context, safeStore -> { - SafeCommand safeCommand = safeStore.unsafeGet(txnId); - Command local = safeCommand.current(); - if (local.is(Stable) || local.is(PreApplied)) - { - Commands.maybeExecute(safeStore, safeCommand, local, true, true); - } - else if (local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated)) - { - unsafeApplyWrites(safeStore, safeCommand, local); - } + applyWrites(command, safeStore, (safeCommand, cmd) -> { + unsafeApplyWrites(safeStore, safeCommand, cmd); + }); return null; }); onDone.success(); @@ -1481,17 +1444,17 @@ public abstract class InMemoryCommandStore extends CommandStore onDone.failure(t); } } - } - public static void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command) - { - Command.Executed executed = command.asExecuted(); - Participants<?> executes = executed.participants().executes(safeStore, command.txnId(), command.executeAt()); - if (!executes.isEmpty()) + protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command) { - command.writes().applyUnsafe(safeStore, Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn()); - safeCommand.applied(safeStore); - safeStore.notifyListeners(safeCommand, command); + Command.Executed executed = command.asExecuted(); + Participants<?> executes = executed.participants().executes(safeStore, command.txnId(), command.executeAt()); + if (!executes.isEmpty()) + { + command.writes().applyUnsafe(safeStore, Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn()); + safeCommand.applied(safeStore); + safeStore.notifyListeners(safeCommand, command); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
