This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 38e7217727 Only one retention cycle in progress at a time (#2797)
38e7217727 is described below
commit 38e721772740744e840472383249fbf89abc2ec8
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Apr 5 06:58:39 2026 +0800
Only one retention cycle in progress at a time (#2797)
* fix: Only one retention cycle in progress at a time
Track retention lifecycle steps with mutable retentionInProgress state
in BehaviorSetup. Key changes:
- Add retentionInProgress flag and 6 progress tracking methods with
detailed debug logging to BehaviorSetup.
- Skip new retention cycle when previous one has not completed yet,
logging at INFO level. Next retention will cover skipped retention.
- Simplify internalDeleteSnapshots to always use minSequenceNr=0,
preventing leftover snapshots when retention is skipped.
- Remove now-unnecessary deleteLowerSequenceNr from
SnapshotCountRetentionCriteriaImpl.
- Fix upstream logging placeholder mismatch bug in
retentionProgressDeleteEventsEnded (2 placeholders, 1 argument).
The retention process for SnapshotCountRetentionCriteria:
1. Save snapshot when shouldSnapshotAfterPersist returns
SnapshotWithRetention.
2. Delete events (when deleteEventsOnSnapshot=true), in background.
3. Delete snapshots (when isOnlyOneSnapshot=false), in background.
Upstream: akka/akka-core@57b750a3dc
Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
Co-authored-by: Copilot <[email protected]>
* Address review feedback: format long line, use DEBUG log level, add
rationale comments
- Reformat RetentionCriteriaSpec expected list to multi-line for readability
- Change 'Skipping retention' log level from INFO to DEBUG to avoid log
noise
- Add design rationale comment explaining why snapshot+retention are skipped
together (prevents orphaned snapshots that would never be cleaned up)
- Add Scaladoc explaining why minSequenceNr=0L is used (simplifies logic,
safe for built-in snapshot stores)
Co-authored-by: Copilot <[email protected]>
* fix: remaining two-arg expectDeleteSnapshotCompleted calls in retention
spec
Lines 292 and 298 still used the old two-argument form after the API
change to single-argument expectDeleteSnapshotCompleted(Long).
---------
Co-authored-by: Copilot <[email protected]>
---
.../EventSourcedBehaviorRetentionSpec.scala | 61 ++++++------
.../scaladsl/EventSourcedBehaviorWatchSpec.scala | 3 +-
.../persistence/typed/internal/BehaviorSetup.scala | 108 ++++++++++++++++++++-
.../typed/internal/EventSourcedBehaviorImpl.scala | 3 +-
.../typed/internal/ExternalInteractions.scala | 12 ++-
.../typed/internal/RetentionCriteriaImpl.scala | 10 --
.../pekko/persistence/typed/internal/Running.scala | 37 +++++--
.../typed/internal/RetentionCriteriaSpec.scala | 27 +++---
8 files changed, 193 insertions(+), 68 deletions(-)
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
index 9972cd77c9..c4d5945cd1 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
@@ -114,13 +114,12 @@ object EventSourcedBehaviorRetentionSpec extends Matchers
{
completed
}
- def expectDeleteSnapshotCompleted(maxSequenceNr: Long, minSequenceNr:
Long): DeleteSnapshotsCompleted = {
+ def expectDeleteSnapshotCompleted(maxSequenceNr: Long):
DeleteSnapshotsCompleted = {
val wrapped = probe.expectMessageType[WrappedSignal]
wrapped.signal shouldBe a[DeleteSnapshotsCompleted]
val signal = wrapped.signal.asInstanceOf[DeleteSnapshotsCompleted]
signal.target should ===(
- DeletionTarget.Criteria(
-
SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr)))
+
DeletionTarget.Criteria(SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr)))
signal
}
}
@@ -284,25 +283,25 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(6)
snapshotSignalProbe.expectSnapshotCompleted(9)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(3)
(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(13, (0 until 13).toVector))
snapshotSignalProbe.expectSnapshotCompleted(12)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(6)
(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(16, (0 until 16).toVector))
snapshotSignalProbe.expectSnapshotCompleted(15)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(9)
(1 to 4).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotSignalProbe.expectSnapshotCompleted(18)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(12)
snapshotSignalProbe.expectNoMessage()
}
@@ -331,7 +330,7 @@ class EventSourcedBehaviorRetentionSpec
// The reason for -1 is that a snapshot at the exact toSequenceNr is
still useful and the events
// after that can be replayed after that snapshot, but replaying the
events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
- snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(2)
// one at a time since snapshotting+event-deletion switches to running
state before deleting snapshot so ordering
// if sending many commands in one go is not deterministic
@@ -339,7 +338,7 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! Increment // 12
snapshotSignalProbe.expectSnapshotCompleted(12)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 6
- snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
persistentActor ! Increment // 13
persistentActor ! Increment // 14
@@ -347,7 +346,7 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! Increment // 15
snapshotSignalProbe.expectSnapshotCompleted(15)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 9
- snapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(8)
persistentActor ! Increment // 16
persistentActor ! Increment // 17
@@ -355,7 +354,7 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(18)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 12
- snapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(11)
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
@@ -381,7 +380,7 @@ class EventSourcedBehaviorRetentionSpec
(4 to 10).foreach(_ => persistentActor ! Increment)
snapshotSignalProbe.expectSnapshotCompleted(5)
snapshotSignalProbe.expectSnapshotCompleted(10)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
(11 to 13).foreach(_ => persistentActor ! Increment)
snapshotSignalProbe.expectSnapshotCompleted(13)
@@ -395,7 +394,7 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(16, (0 until 16).toVector))
snapshotSignalProbe.expectSnapshotCompleted(15)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(10)
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
@@ -438,18 +437,18 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(8) // every-2 through
criteria
// triggers delete up to snapshot no 2
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 2
- snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete
oldest snapshot
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(1) // then delete
oldest snapshot
persistentActor ! Increment // 9
persistentActor ! Increment // 10
snapshotSignalProbe.expectSnapshotCompleted(10) // every-2 through
criteria
- snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(3)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 4
persistentActor ! Increment // 11
persistentActor ! Increment // 12
snapshotSignalProbe.expectSnapshotCompleted(12) // every-2 through
criteria
- snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 6
persistentActor ! Increment // 13
@@ -463,13 +462,13 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! Increment // 14
snapshotSignalProbe.expectSnapshotCompleted(14) // every-2 through
criteria
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 8
- snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(7)
persistentActor ! Increment // 15
persistentActor ! Increment // 16
snapshotSignalProbe.expectSnapshotCompleted(16) // every-2 through
criteria
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 10
- snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(9)
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
@@ -501,31 +500,31 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(2)
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(4)
- deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1)
persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(5)
- deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2)
persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(6)
- deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3)
persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(7)
- deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4)
persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(8)
- deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5)
persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(9)
- deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6)
persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(10)
- deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
@@ -554,32 +553,32 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! Increment // 5
snapshotSignalProbe.expectSnapshotCompleted(5)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 2
- snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(1)
persistentActor ! Increment // 6
snapshotSignalProbe.expectSnapshotCompleted(6)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 3
- snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(2)
persistentActor ! Increment // 7
snapshotSignalProbe.expectSnapshotCompleted(7)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 4
- snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(3)
persistentActor ! Increment // 8
snapshotSignalProbe.expectSnapshotCompleted(8)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 5
- snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(4)
persistentActor ! Increment // 9
snapshotSignalProbe.expectSnapshotCompleted(9)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 6
- snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
persistentActor ! Increment // 10
snapshotSignalProbe.expectSnapshotCompleted(10)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 7
- snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
+ snapshotSignalProbe.expectDeleteSnapshotCompleted(6)
}
"snapshot on recovery if expected snapshot is missing" in {
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
index 6124e83264..ded56b0bf5 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
@@ -87,7 +87,8 @@ class EventSourcedBehaviorWatchSpec
stashState = new
StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings),
replication = None,
publishEvents = false,
- internalLoggerFactory = () => logger)
+ internalLoggerFactory = () => logger,
+ retentionInProgress = false)
"A typed persistent parent actor watching a child" must {
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
index 6e80c2b75e..38b94166a0 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
@@ -22,6 +22,7 @@ import pekko.actor.{ ActorRef => ClassicActorRef }
import pekko.actor.Cancellable
import pekko.actor.typed.Signal
import pekko.actor.typed.scaladsl.ActorContext
+import pekko.actor.typed.scaladsl.LoggerOps
import pekko.annotation.InternalApi
import pekko.persistence._
import pekko.persistence.typed.EventAdapter
@@ -71,7 +72,8 @@ private[pekko] final class BehaviorSetup[C, E, S](
val stashState: StashState,
val replication: Option[ReplicationSetup],
val publishEvents: Boolean,
- private val internalLoggerFactory: () => Logger) {
+ private val internalLoggerFactory: () => Logger,
+ private var retentionInProgress: Boolean) {
import BehaviorSetup._
import InternalProtocol.RecoveryTickEvent
@@ -197,6 +199,110 @@ private[pekko] final class BehaviorSetup[C, E, S](
}
}
+ // The retention process for SnapshotCountRetentionCriteria looks like this:
+ // 1. Save snapshot after persisting events when shouldSnapshotAfterPersist
returned SnapshotWithRetention.
+ // 2. Delete events (when deleteEventsOnSnapshot=true), runs in background.
+ // 3. Delete snapshots (when isOnlyOneSnapshot=false), runs in background.
+
+ def isRetentionInProgress(): Boolean =
+ retentionInProgress
+
+ def retentionProgressSaveSnapshotStarted(sequenceNr: Long): Unit = {
+ retention match {
+ case SnapshotCountRetentionCriteriaImpl(_, _, _) =>
+ internalLogger.debug("Starting retention at seqNr [{}], saving
snapshot.", sequenceNr)
+ retentionInProgress = true
+ case _ =>
+ }
+ }
+
+ def retentionProgressSaveSnapshotEnded(sequenceNr: Long, success: Boolean):
Unit = {
+ retention match {
+ case SnapshotCountRetentionCriteriaImpl(_, _, deleteEvents) if
retentionInProgress =>
+ if (!success) {
+ internalLogger.debug("Retention at seqNr [{}] is completed, saving
snapshot failed.", sequenceNr)
+ retentionInProgress = false
+ } else if (deleteEvents) {
+ internalLogger.debug("Retention at seqNr [{}], saving snapshot was
successful.", sequenceNr)
+ } else if (isOnlyOneSnapshot) {
+ // no delete of events and no delete of snapshots => done
+ internalLogger.debug("Retention at seqNr [{}] is completed, saving
snapshot was successful.", sequenceNr)
+ retentionInProgress = false
+ } else {
+ internalLogger.debug("Retention at seqNr [{}], saving snapshot was
successful.", sequenceNr)
+ }
+ case _ =>
+ }
+ }
+
+ def retentionProgressDeleteEventsStarted(sequenceNr: Long,
deleteToSequenceNr: Long): Unit = {
+ retention match {
+ case SnapshotCountRetentionCriteriaImpl(_, _, true) if
retentionInProgress =>
+ if (deleteToSequenceNr > 0) {
+ internalLogger.debug2(
+ "Retention at seqNr [{}], deleting events to seqNr [{}].",
+ sequenceNr,
+ deleteToSequenceNr)
+ } else {
+ internalLogger.debug("Retention is completed, no events to delete.")
+ retentionInProgress = false
+ }
+ case _ =>
+ }
+ }
+
+ def retentionProgressDeleteEventsEnded(deleteToSequenceNr: Long, success:
Boolean): Unit = {
+ retention match {
+ case SnapshotCountRetentionCriteriaImpl(_, _, true) if
retentionInProgress =>
+ if (!success) {
+ internalLogger.debug(
+ "Retention is completed, deleting events to seqNr [{}] failed.",
+ deleteToSequenceNr)
+ retentionInProgress = false
+ } else if (isOnlyOneSnapshot) {
+ // no delete of snapshots => done
+ internalLogger.debug(
+ "Retention is completed, deleting events to seqNr [{}] was
successful.",
+ deleteToSequenceNr)
+ retentionInProgress = false
+ } else {
+ internalLogger.debug("Retention, deleting events to seqNr [{}] was
successful.", deleteToSequenceNr)
+ }
+ case _ =>
+ }
+ }
+
+ def retentionProgressDeleteSnapshotsStarted(deleteToSequenceNr: Long): Unit
= {
+ retention match {
+ case SnapshotCountRetentionCriteriaImpl(_, _, _) if retentionInProgress
=>
+ if (deleteToSequenceNr > 0) {
+ internalLogger.debug("Retention, deleting snapshots to seqNr [{}].",
deleteToSequenceNr)
+ } else {
+ internalLogger.debug("Retention is completed, no snapshots to
delete.")
+ retentionInProgress = false
+ }
+ case _ =>
+ }
+ }
+
+ def retentionProgressDeleteSnapshotsEnded(deleteToSequenceNr: Long, success:
Boolean): Unit = {
+ retention match {
+ case SnapshotCountRetentionCriteriaImpl(_, _, _) if retentionInProgress
=>
+ if (success) {
+ // delete snapshot is last step => done
+ internalLogger.debug(
+ "Retention is completed, deleting snapshots to seqNr [{}] was
successful.",
+ deleteToSequenceNr)
+ retentionInProgress = false
+ } else {
+ internalLogger.debug("Retention is completed, deleting snapshots to
seqNr [{}] failed.", deleteToSequenceNr)
+ retentionInProgress = false
+ }
+
+ case _ =>
+ }
+ }
+
}
/**
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
index 0aa0df8bed..75ebbb1b43 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
@@ -209,7 +209,8 @@ private[pekko] final case class
EventSourcedBehaviorImpl[Command, Event, State](
stashState = stashState,
replication = replication,
publishEvents = publishEvents,
- internalLoggerFactory = () => internalLogger())
+ internalLoggerFactory = () => internalLogger(),
+ retentionInProgress = false)
// needs to accept Any since we also can get messages from the
journal
// not part of the user facing Command protocol
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
index 417af3ddad..f56411ad5e 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
@@ -223,11 +223,15 @@ private[pekko] trait SnapshotInteractions[C, E, S] {
}
}
- /** Deletes the snapshots up to and including the `sequenceNr`. */
- protected def internalDeleteSnapshots(fromSequenceNr: Long, toSequenceNr:
Long): Unit = {
+ /**
+ * Deletes the snapshots up to and including the `sequenceNr`.
+ * Uses `minSequenceNr = 0L` to always delete from the beginning, which
simplifies
+ * the retention bookkeeping by removing the need to track a separate lower
bound.
+ */
+ protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = {
if (toSequenceNr > 0) {
- val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr =
fromSequenceNr, maxSequenceNr = toSequenceNr)
- setup.internalLogger.debug2("Deleting snapshots from sequenceNr [{}] to
[{}]", fromSequenceNr, toSequenceNr)
+ val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = 0L,
maxSequenceNr = toSequenceNr)
+ setup.internalLogger.debug("Deleting snapshots to sequenceNr [{}]",
toSequenceNr)
setup.snapshotStore
.tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id,
snapshotCriteria), setup.selfClassic)
}
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
index 0bc9a57b10..a07935e432 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
@@ -44,16 +44,6 @@ import pekko.persistence.typed.scaladsl
math.max(0, lastSequenceNr - (keepNSnapshots.toLong *
snapshotEveryNEvents))
}
- /**
- * Should only be used when `BehaviorSetup.isOnlyOneSnapshot` is false.
- */
- def deleteLowerSequenceNr(upperSequenceNr: Long): Long = {
- // We could use 0 as fromSequenceNr to delete all older snapshots, but
that might be inefficient for
- // large ranges depending on how it's implemented in the snapshot plugin.
Therefore we use the
- // same window as defined for how much to keep in the retention criteria
- math.max(0, upperSequenceNr - (keepNSnapshots.toLong *
snapshotEveryNEvents))
- }
-
override def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteriaImpl =
copy(deleteEventsOnSnapshot = true)
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index bb983d05a2..a43eb278b1 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -813,13 +813,27 @@ private[pekko] object Running {
this
} else {
visibleState = state
- if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null)
{
+ def skipRetention(): Boolean = {
+ // Only one retention process (snapshot + optional event/snapshot
deletion) at a time.
+ // When retention is already in progress, both the snapshot and
the subsequent
+ // deletion steps are skipped together. This keeps retention state
simple and avoids
+ // transiently exceeding the intended snapshot count. The next
retention cycle at a
+ // higher seqNr will cover the range of the skipped one, so no
data is lost.
+ val inProgress = shouldSnapshotAfterPersist ==
SnapshotWithRetention && setup.isRetentionInProgress()
+ if (inProgress)
+ setup.internalLogger.debug(
+ "Skipping retention at seqNr [{}] because previous retention
has not completed yet. " +
+ "Next retention will cover skipped retention.",
+ state.seqNr)
+ inProgress
+ }
+ if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null
|| skipRetention()) {
val newState = applySideEffects(sideEffects, state)
-
onWriteDone(setup.context, p)
-
tryUnstashOne(newState)
} else {
+ if (shouldSnapshotAfterPersist == SnapshotWithRetention)
+ setup.retentionProgressSaveSnapshotStarted(state.seqNr)
internalSaveSnapshot(state)
new StoringSnapshot(state, sideEffects, shouldSnapshotAfterPersist)
}
@@ -910,18 +924,22 @@ private[pekko] object Running {
setup.retention match {
case DisabledRetentionCriteria => // no
further actions
case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) =>
+ setup.retentionProgressSaveSnapshotEnded(state.seqNr, success
= true)
// deleteEventsOnSnapshot == true, deletion of old events
val deleteEventsToSeqNr = {
if (setup.isOnlyOneSnapshot) meta.sequenceNr // delete all
events up to the snapshot
else s.deleteUpperSequenceNr(meta.sequenceNr) //
keepNSnapshots batches of events
}
// snapshot deletion then happens on event deletion success in
Running.onDeleteEventsJournalResponse
+ setup.retentionProgressDeleteEventsStarted(state.seqNr,
deleteEventsToSeqNr)
internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr)
case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) =>
+ setup.retentionProgressSaveSnapshotEnded(state.seqNr, success
= true)
// deleteEventsOnSnapshot == false, deletion of old snapshots
if (!setup.isOnlyOneSnapshot) {
val deleteSnapshotsToSeqNr =
s.deleteUpperSequenceNr(meta.sequenceNr)
-
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr),
deleteSnapshotsToSeqNr)
+
setup.retentionProgressDeleteSnapshotsStarted(deleteSnapshotsToSeqNr)
+ internalDeleteSnapshots(deleteSnapshotsToSeqNr)
}
case unexpected => throw new IllegalStateException(s"Unexpected
retention criteria: $unexpected")
}
@@ -930,6 +948,8 @@ private[pekko] object Running {
Some(SnapshotCompleted(SnapshotMetadata.fromClassic(meta)))
case SaveSnapshotFailure(meta, error) =>
+ if (snapshotReason == SnapshotWithRetention)
+ setup.retentionProgressSaveSnapshotEnded(state.seqNr, success =
false)
setup.internalLogger.warn2("Failed to save snapshot given metadata
[{}] due to: {}", meta, error.getMessage)
Some(SnapshotFailed(SnapshotMetadata.fromClassic(meta), error))
@@ -1029,20 +1049,23 @@ private[pekko] object Running {
val signal = response match {
case DeleteMessagesSuccess(toSequenceNr) =>
setup.internalLogger.debug("Persistent events to sequenceNr [{}]
deleted successfully.", toSequenceNr)
+ setup.retentionProgressDeleteEventsEnded(toSequenceNr, success = true)
setup.retention match {
case DisabledRetentionCriteria => // no further actions
- case s: SnapshotCountRetentionCriteriaImpl =>
+ case _: SnapshotCountRetentionCriteriaImpl =>
if (!setup.isOnlyOneSnapshot) {
// The reason for -1 is that a snapshot at the exact
toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying
the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
val deleteSnapshotsToSeqNr = toSequenceNr - 1
-
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr),
deleteSnapshotsToSeqNr)
+
setup.retentionProgressDeleteSnapshotsStarted(deleteSnapshotsToSeqNr)
+ internalDeleteSnapshots(deleteSnapshotsToSeqNr)
}
case unexpected => throw new IllegalStateException(s"Unexpected
retention criteria: $unexpected")
}
Some(DeleteEventsCompleted(toSequenceNr))
case DeleteMessagesFailure(e, toSequenceNr) =>
+ setup.retentionProgressDeleteEventsEnded(toSequenceNr, success = false)
Some(DeleteEventsFailed(toSequenceNr, e))
case _ =>
None
@@ -1063,8 +1086,10 @@ private[pekko] object Running {
def onDeleteSnapshotResponse(response: SnapshotProtocol.Response, state: S):
Behavior[InternalProtocol] = {
val signal = response match {
case DeleteSnapshotsSuccess(criteria) =>
+ setup.retentionProgressDeleteSnapshotsEnded(criteria.maxSequenceNr,
success = true)
Some(DeleteSnapshotsCompleted(DeletionTarget.Criteria(SnapshotSelectionCriteria.fromClassic(criteria))))
case DeleteSnapshotsFailure(criteria, error) =>
+ setup.retentionProgressDeleteSnapshotsEnded(criteria.maxSequenceNr,
success = false)
Some(DeleteSnapshotsFailed(DeletionTarget.Criteria(SnapshotSelectionCriteria.fromClassic(criteria)),
error))
case DeleteSnapshotSuccess(meta) =>
Some(DeleteSnapshotsCompleted(DeletionTarget.Individual(SnapshotMetadata.fromClassic(meta))))
diff --git
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala
index d8a903ab4e..93207a459d 100644
---
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala
+++
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala
@@ -39,23 +39,22 @@ class RetentionCriteriaSpec extends TestSuite with Matchers
with AnyWordSpecLike
"have valid sequenceNr range based on keepNSnapshots" in {
val criteria = RetentionCriteria.snapshotEvery(3,
2).asInstanceOf[SnapshotCountRetentionCriteriaImpl]
val expected = List(
- 1 -> (0 -> 0),
- 3 -> (0 -> 0),
- 4 -> (0 -> 0),
- 6 -> (0 -> 0),
- 7 -> (0 -> 1),
- 9 -> (0 -> 3),
- 10 -> (0 -> 4),
- 12 -> (0 -> 6),
- 13 -> (1 -> 7),
- 15 -> (3 -> 9),
- 18 -> (6 -> 12),
- 20 -> (8 -> 14))
+ 1 -> 0,
+ 3 -> 0,
+ 4 -> 0,
+ 6 -> 0,
+ 7 -> 1,
+ 9 -> 3,
+ 10 -> 4,
+ 12 -> 6,
+ 13 -> 7,
+ 15 -> 9,
+ 18 -> 12,
+ 20 -> 14)
expected.foreach {
- case (seqNr, (lower, upper)) =>
+ case (seqNr, upper) =>
withClue(s"seqNr=$seqNr:") {
criteria.deleteUpperSequenceNr(seqNr) should ===(upper)
- criteria.deleteLowerSequenceNr(upper) should ===(lower)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]