This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 1ce253dae4 fix replay OOM and TFK exceptions
1ce253dae4 is described below

commit 1ce253dae41b44eabbd0235e652580ee22666d45
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 9 17:21:29 2024 +0100

    fix replay OOM and TFK exceptions
---
 modules/accord                                     |  2 +-
 .../org/apache/cassandra/journal/Compactor.java    |  2 +-
 .../service/accord/AccordCommandStore.java         |  5 +---
 .../cassandra/service/accord/AccordJournal.java    | 30 +++++-----------------
 4 files changed, 9 insertions(+), 30 deletions(-)

diff --git a/modules/accord b/modules/accord
index 291cbe70ad..841e139bc8 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 291cbe70ad82b0d5f875101f541d9f841912802f
+Subproject commit 841e139bc8a974ac674ce8eae847bd52255ca544
diff --git a/src/java/org/apache/cassandra/journal/Compactor.java 
b/src/java/org/apache/cassandra/journal/Compactor.java
index 51b2fec97b..4ecfb74091 100644
--- a/src/java/org/apache/cassandra/journal/Compactor.java
+++ b/src/java/org/apache/cassandra/journal/Compactor.java
@@ -45,7 +45,7 @@ public final class Compactor<K, V> implements Runnable, 
Shutdownable
 
     synchronized void start()
     {
-        if (!journal.params.enableCompaction())
+        if (journal.params.enableCompaction())
             schedule(journal.params.compactionPeriodMillis(), 
TimeUnit.MILLISECONDS);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 022f58556b..718c669f47 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -624,10 +624,7 @@ public class AccordCommandStore extends CommandStore
                          safeStore -> {
                              SafeCommand safeCommand = 
safeStore.unsafeGet(txnId);
                              Command local = safeCommand.current();
-                             if (local.is(Stable) || local.is(PreApplied))
-                                 Commands.maybeExecute(safeStore, safeCommand, 
local, true, true);
-                             else if (local.saveStatus().compareTo(Applying) 
>= 0 && !local.hasBeen(Truncated))
-                                 Commands.applyWrites(safeStore, context, 
local).begin(agent);
+                             Commands.maybeExecute(safeStore, safeCommand, 
local, true, true);
                          })
                 .begin((unused, throwable) -> {
                     if (throwable != null)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 2abe7a9f8d..c0bf9c5a6b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.impl.ErasedSafeCommand;
+import accord.impl.TimestampsForKey;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStores;
@@ -357,23 +358,10 @@ public class AccordJournal implements IJournal, 
Shutdownable
     public void replay()
     {
         logger.info("Starting journal replay.");
+        TimestampsForKey.unsafeSetReplay(true);
         CommandsForKey.disableLinearizabilityViolationsReporting();
         AccordKeyspace.truncateAllCaches();
 
-        // TODO (expected): optimize replay memory footprint
-        class ToApply
-        {
-            final JournalKey key;
-            final Command command;
-
-            ToApply(JournalKey key, Command command)
-            {
-                this.key = key;
-                this.command = command;
-            }
-        }
-
-        List<ToApply> toApply = new ArrayList<>();
         try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = 
journalTable.readAll())
         {
             JournalKey key;
@@ -406,23 +394,17 @@ public class AccordJournal implements IJournal, 
Shutdownable
                 {
                     Command command = builder.construct();
                     AccordCommandStore commandStore = (AccordCommandStore) 
node.commandStores().forId(key.commandStoreId);
-                    commandStore.loader().load(command).get();
+                    AccordCommandStore.Loader loader = commandStore.loader();
+                    loader.load(command).get();
                     if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 
&& !command.hasBeen(Truncated))
-                        toApply.add(new ToApply(key, command));
+                        loader.apply(command);
                 }
             }
 
-            toApply.sort(Comparator.comparing(v -> v.command.executeAt()));
-            for (ToApply apply : toApply)
-            {
-                AccordCommandStore commandStore = (AccordCommandStore) 
node.commandStores().forId(apply.key.commandStoreId);
-                logger.info("Apply {}", apply.command);
-                commandStore.loader().apply(apply.command);
-            }
-
             logger.info("Waiting for command stores to quiesce.");
             ((AccordCommandStores)node.commandStores()).waitForQuiescense();
             CommandsForKey.enableLinearizabilityViolationsReporting();
+            TimestampsForKey.unsafeSetReplay(false);
             logger.info("Finished journal replay.");
             status = Status.STARTED;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to