This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 1ce253dae4 fix replay OOM and TFK exceptions
1ce253dae4 is described below
commit 1ce253dae41b44eabbd0235e652580ee22666d45
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 9 17:21:29 2024 +0100
fix replay OOM and TFK exceptions
---
modules/accord | 2 +-
.../org/apache/cassandra/journal/Compactor.java | 2 +-
.../service/accord/AccordCommandStore.java | 5 +---
.../cassandra/service/accord/AccordJournal.java | 30 +++++-----------------
4 files changed, 9 insertions(+), 30 deletions(-)
diff --git a/modules/accord b/modules/accord
index 291cbe70ad..841e139bc8 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 291cbe70ad82b0d5f875101f541d9f841912802f
+Subproject commit 841e139bc8a974ac674ce8eae847bd52255ca544
diff --git a/src/java/org/apache/cassandra/journal/Compactor.java
b/src/java/org/apache/cassandra/journal/Compactor.java
index 51b2fec97b..4ecfb74091 100644
--- a/src/java/org/apache/cassandra/journal/Compactor.java
+++ b/src/java/org/apache/cassandra/journal/Compactor.java
@@ -45,7 +45,7 @@ public final class Compactor<K, V> implements Runnable,
Shutdownable
synchronized void start()
{
- if (!journal.params.enableCompaction())
+ if (journal.params.enableCompaction())
schedule(journal.params.compactionPeriodMillis(),
TimeUnit.MILLISECONDS);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 022f58556b..718c669f47 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -624,10 +624,7 @@ public class AccordCommandStore extends CommandStore
safeStore -> {
SafeCommand safeCommand =
safeStore.unsafeGet(txnId);
Command local = safeCommand.current();
- if (local.is(Stable) || local.is(PreApplied))
- Commands.maybeExecute(safeStore, safeCommand,
local, true, true);
- else if (local.saveStatus().compareTo(Applying)
>= 0 && !local.hasBeen(Truncated))
- Commands.applyWrites(safeStore, context,
local).begin(agent);
+ Commands.maybeExecute(safeStore, safeCommand,
local, true, true);
})
.begin((unused, throwable) -> {
if (throwable != null)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 2abe7a9f8d..c0bf9c5a6b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.impl.ErasedSafeCommand;
+import accord.impl.TimestampsForKey;
import accord.local.Cleanup;
import accord.local.Command;
import accord.local.CommandStores;
@@ -357,23 +358,10 @@ public class AccordJournal implements IJournal,
Shutdownable
public void replay()
{
logger.info("Starting journal replay.");
+ TimestampsForKey.unsafeSetReplay(true);
CommandsForKey.disableLinearizabilityViolationsReporting();
AccordKeyspace.truncateAllCaches();
- // TODO (expected): optimize replay memory footprint
- class ToApply
- {
- final JournalKey key;
- final Command command;
-
- ToApply(JournalKey key, Command command)
- {
- this.key = key;
- this.command = command;
- }
- }
-
- List<ToApply> toApply = new ArrayList<>();
try (AccordJournalTable.KeyOrderIterator<JournalKey> iter =
journalTable.readAll())
{
JournalKey key;
@@ -406,23 +394,17 @@ public class AccordJournal implements IJournal,
Shutdownable
{
Command command = builder.construct();
AccordCommandStore commandStore = (AccordCommandStore)
node.commandStores().forId(key.commandStoreId);
- commandStore.loader().load(command).get();
+ AccordCommandStore.Loader loader = commandStore.loader();
+ loader.load(command).get();
if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0
&& !command.hasBeen(Truncated))
- toApply.add(new ToApply(key, command));
+ loader.apply(command);
}
}
- toApply.sort(Comparator.comparing(v -> v.command.executeAt()));
- for (ToApply apply : toApply)
- {
- AccordCommandStore commandStore = (AccordCommandStore)
node.commandStores().forId(apply.key.commandStoreId);
- logger.info("Apply {}", apply.command);
- commandStore.loader().apply(apply.command);
- }
-
logger.info("Waiting for command stores to quiesce.");
((AccordCommandStores)node.commandStores()).waitForQuiescense();
CommandsForKey.enableLinearizabilityViolationsReporting();
+ TimestampsForKey.unsafeSetReplay(false);
logger.info("Finished journal replay.");
status = Status.STARTED;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]