This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch CASSANDRA-20112
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 93dce55deab8f1546e3b17882086183c3e4c4cd3
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Nov 27 15:36:15 2024 +0100

    Reuse Loader code between
---
 accord-core/src/main/java/accord/api/Journal.java  |  2 +-
 .../src/main/java/accord/impl/AbstractLoader.java  | 77 ++++++++++++++++++++++
 .../java/accord/impl/InMemoryCommandStore.java     | 77 ++++++----------------
 3 files changed, 98 insertions(+), 58 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index 1747c97f..8c524893 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -85,7 +85,7 @@ public interface Journal
     }
 
     /**
-     *
+     * Helper for CommandStore to restore Command states.
      */
     interface Loader
     {
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java 
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
new file mode 100644
index 00000000..1a43f6e3
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.function.BiConsumer;
+
+import accord.api.Journal;
+import accord.local.Cleanup;
+import accord.local.Command;
+import accord.local.Commands;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.TxnId;
+
+import static accord.primitives.SaveStatus.Applying;
+import static accord.primitives.Status.Invalidated;
+import static accord.primitives.Status.PreApplied;
+import static accord.primitives.Status.Stable;
+import static accord.primitives.Status.Truncated;
+
+public abstract class AbstractLoader implements Journal.Loader
+{
+    protected Command loadInternal(Command command, SafeCommandStore safeStore)
+    {
+        TxnId txnId = command.txnId();
+        if (command.status() != Truncated && command.status() != Invalidated)
+        {
+            Cleanup cleanup = Cleanup.shouldCleanup(safeStore, command, 
command.participants());
+            switch (cleanup)
+            {
+                case NO:
+                    break;
+                case INVALIDATE:
+                case TRUNCATE_WITH_OUTCOME:
+                case TRUNCATE:
+                case ERASE:
+                    command = Commands.purge(command, command.participants(), 
cleanup);
+            }
+        }
+
+        command = safeStore.unsafeGet(txnId).update(safeStore, command);
+        if (command.status() == Truncated)
+            safeStore.progressLog().clear(command.txnId());
+        return command;
+    }
+
+    protected void applyWrites(Command command, SafeCommandStore safeStore, 
BiConsumer<SafeCommand, Command> apply)
+    {
+        TxnId txnId = command.txnId();
+        SafeCommand safeCommand = safeStore.unsafeGet(txnId);
+        Command local = safeCommand.current();
+        if (local.is(Stable) || local.is(PreApplied))
+        {
+            Commands.maybeExecute(safeStore, safeCommand, local, true, true);
+        }
+        else if (local.saveStatus().compareTo(Applying) >= 0 && 
!local.hasBeen(Truncated))
+        {
+            apply.accept(safeCommand, local);
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index aacf1fa0..dde87a8e 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -43,19 +43,16 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
-
-import accord.api.Journal;
-import accord.api.LocalListeners;
-import accord.api.RoutingKey;
-import accord.impl.progresslog.DefaultProgressLog;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.Journal;
+import accord.api.LocalListeners;
 import accord.api.ProgressLog;
-import accord.local.Cleanup;
+import accord.api.RoutingKey;
+import accord.impl.progresslog.DefaultProgressLog;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores.RangesForEpoch;
@@ -98,19 +95,17 @@ import static 
accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestDep.WITH_OR_INVALIDATED;
 import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE;
 import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS;
+import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.SaveStatus.Applying;
 import static accord.primitives.SaveStatus.Erased;
 import static accord.primitives.SaveStatus.ErasedOrVestigial;
 import static accord.primitives.SaveStatus.ReadyToExecute;
 import static accord.primitives.Status.Applied;
 import static accord.primitives.Status.Durability.Local;
-import static accord.primitives.Status.Invalidated;
-import static accord.primitives.Status.PreApplied;
+import static accord.primitives.Status.NotDefined;
 import static accord.primitives.Status.PreCommitted;
 import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
-import static accord.primitives.Status.NotDefined;
-import static accord.primitives.Routables.Slice.Minimal;
 import static accord.utils.Invariants.illegalState;
 import static java.lang.String.format;
 
@@ -1385,7 +1380,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         return loader;
     }
 
-    private static class CommandLoader implements Journal.Loader
+    private static class CommandLoader extends AbstractLoader
     {
         private final InMemoryCommandStore commandStore;
 
@@ -1415,34 +1410,11 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         @Override
         public void load(Command command, Journal.OnDone onDone)
         {
-            TxnId txnId = command.txnId();
-
             try
             {
                 commandStore.executeInContext(commandStore,
                                               context(command, ASYNC),
-                                              safeStore -> {
-                                                  Command local = command;
-                                                  if (local.status() != 
Truncated && local.status() != Invalidated)
-                                                  {
-                                                      Cleanup cleanup = 
Cleanup.shouldCleanup(safeStore, local, local.participants());
-                                                      switch (cleanup)
-                                                      {
-                                                          case NO:
-                                                              break;
-                                                          case INVALIDATE:
-                                                          case 
TRUNCATE_WITH_OUTCOME:
-                                                          case TRUNCATE:
-                                                          case ERASE:
-                                                              local = 
Commands.purge(local, local.participants(), cleanup);
-                                                      }
-                                                  }
-
-                                                  local = 
safeStore.unsafeGet(txnId).update(safeStore, local);
-                                                  if (local.status() == 
Truncated)
-                                                      
safeStore.progressLog().clear(local.txnId());
-                                                  return local;
-                                              });
+                                              safeStore -> 
loadInternal(command, safeStore));
                 onDone.success();
             }
             catch (Throwable t)
@@ -1454,24 +1426,15 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         @Override
         public void apply(Command command, Journal.OnDone onDone)
         {
-            TxnId txnId = command.txnId();
-
             try
             {
                 PreLoadContext context = context(command, 
KeyHistory.TIMESTAMPS);
                 commandStore.executeInContext(commandStore,
                                               context,
                                               safeStore -> {
-                                                  SafeCommand safeCommand = 
safeStore.unsafeGet(txnId);
-                                                  Command local = 
safeCommand.current();
-                                                  if (local.is(Stable) || 
local.is(PreApplied))
-                                                  {
-                                                      
Commands.maybeExecute(safeStore, safeCommand, local, true, true);
-                                                  }
-                                                  else if 
(local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated))
-                                                  {
-                                                      
unsafeApplyWrites(safeStore, safeCommand, local);
-                                                  }
+                                                  applyWrites(command, 
safeStore, (safeCommand, cmd) -> {
+                                                      
unsafeApplyWrites(safeStore, safeCommand, cmd);
+                                                  });
                                                   return null;
                                               });
                 onDone.success();
@@ -1481,17 +1444,17 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                 onDone.failure(t);
             }
         }
-    }
 
-    public static void unsafeApplyWrites(SafeCommandStore safeStore, 
SafeCommand safeCommand, Command command)
-    {
-        Command.Executed executed = command.asExecuted();
-        Participants<?> executes = executed.participants().executes(safeStore, 
command.txnId(), command.executeAt());
-        if (!executes.isEmpty())
+        protected void unsafeApplyWrites(SafeCommandStore safeStore, 
SafeCommand safeCommand, Command command)
         {
-            command.writes().applyUnsafe(safeStore, 
Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn());
-            safeCommand.applied(safeStore);
-            safeStore.notifyListeners(safeCommand, command);
+            Command.Executed executed = command.asExecuted();
+            Participants<?> executes = 
executed.participants().executes(safeStore, command.txnId(), 
command.executeAt());
+            if (!executes.isEmpty())
+            {
+                command.writes().applyUnsafe(safeStore, 
Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn());
+                safeCommand.applied(safeStore);
+                safeStore.notifyListeners(safeCommand, command);
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to