This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 38e7217727 Only one retention cycle in progress at a time (#2797)
38e7217727 is described below

commit 38e721772740744e840472383249fbf89abc2ec8
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Apr 5 06:58:39 2026 +0800

    Only one retention cycle in progress at a time (#2797)
    
    * fix: Only one retention cycle in progress at a time
    
    Track retention lifecycle steps with mutable retentionInProgress state
    in BehaviorSetup. Key changes:
    
    - Add retentionInProgress flag and 6 progress tracking methods with
      detailed debug logging to BehaviorSetup.
    - Skip new retention cycle when previous one has not completed yet,
      logging at INFO level. Next retention will cover skipped retention.
    - Simplify internalDeleteSnapshots to always use minSequenceNr=0,
      preventing leftover snapshots when retention is skipped.
    - Remove now-unnecessary deleteLowerSequenceNr from
      SnapshotCountRetentionCriteriaImpl.
    - Fix upstream logging placeholder mismatch bug in
      retentionProgressDeleteEventsEnded (2 placeholders, 1 argument).
    
    The retention process for SnapshotCountRetentionCriteria:
    1. Save snapshot when shouldSnapshotAfterPersist returns
       SnapshotWithRetention.
    2. Delete events (when deleteEventsOnSnapshot=true), in background.
    3. Delete snapshots (when isOnlyOneSnapshot=false), in background.
    
    Upstream: akka/akka-core@57b750a3dc
    Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
    
    Co-authored-by: Copilot <[email protected]>
    
    * Address review feedback: format long line, use DEBUG log level, add 
rationale comments
    
    - Reformat RetentionCriteriaSpec expected list to multi-line for readability
    - Change 'Skipping retention' log level from INFO to DEBUG to avoid log 
noise
    - Add design rationale comment explaining why snapshot+retention are skipped
      together (prevents orphaned snapshots that would never be cleaned up)
    - Add Scaladoc explaining why minSequenceNr=0L is used (simplifies logic,
      safe for built-in snapshot stores)
    
    Co-authored-by: Copilot <[email protected]>
    
    * fix: remaining two-arg expectDeleteSnapshotCompleted calls in retention 
spec
    
    Lines 292 and 298 still used the old two-argument form after the API
    change to single-argument expectDeleteSnapshotCompleted(Long).
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../EventSourcedBehaviorRetentionSpec.scala        |  61 ++++++------
 .../scaladsl/EventSourcedBehaviorWatchSpec.scala   |   3 +-
 .../persistence/typed/internal/BehaviorSetup.scala | 108 ++++++++++++++++++++-
 .../typed/internal/EventSourcedBehaviorImpl.scala  |   3 +-
 .../typed/internal/ExternalInteractions.scala      |  12 ++-
 .../typed/internal/RetentionCriteriaImpl.scala     |  10 --
 .../pekko/persistence/typed/internal/Running.scala |  37 +++++--
 .../typed/internal/RetentionCriteriaSpec.scala     |  27 +++---
 8 files changed, 193 insertions(+), 68 deletions(-)

diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
index 9972cd77c9..c4d5945cd1 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
@@ -114,13 +114,12 @@ object EventSourcedBehaviorRetentionSpec extends Matchers 
{
       completed
     }
 
-    def expectDeleteSnapshotCompleted(maxSequenceNr: Long, minSequenceNr: 
Long): DeleteSnapshotsCompleted = {
+    def expectDeleteSnapshotCompleted(maxSequenceNr: Long): 
DeleteSnapshotsCompleted = {
       val wrapped = probe.expectMessageType[WrappedSignal]
       wrapped.signal shouldBe a[DeleteSnapshotsCompleted]
       val signal = wrapped.signal.asInstanceOf[DeleteSnapshotsCompleted]
       signal.target should ===(
-        DeletionTarget.Criteria(
-          
SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr)))
+        
DeletionTarget.Criteria(SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr)))
       signal
     }
   }
@@ -284,25 +283,25 @@ class EventSourcedBehaviorRetentionSpec
       snapshotSignalProbe.expectSnapshotCompleted(3)
       snapshotSignalProbe.expectSnapshotCompleted(6)
       snapshotSignalProbe.expectSnapshotCompleted(9)
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(3)
 
       (1 to 3).foreach(_ => persistentActor ! Increment)
       persistentActor ! GetValue(replyProbe.ref)
       replyProbe.expectMessage(State(13, (0 until 13).toVector))
       snapshotSignalProbe.expectSnapshotCompleted(12)
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(6)
 
       (1 to 3).foreach(_ => persistentActor ! Increment)
       persistentActor ! GetValue(replyProbe.ref)
       replyProbe.expectMessage(State(16, (0 until 16).toVector))
       snapshotSignalProbe.expectSnapshotCompleted(15)
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(9)
 
       (1 to 4).foreach(_ => persistentActor ! Increment)
       persistentActor ! GetValue(replyProbe.ref)
       replyProbe.expectMessage(State(20, (0 until 20).toVector))
       snapshotSignalProbe.expectSnapshotCompleted(18)
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(12)
 
       snapshotSignalProbe.expectNoMessage()
     }
@@ -331,7 +330,7 @@ class EventSourcedBehaviorRetentionSpec
       // The reason for -1 is that a snapshot at the exact toSequenceNr is 
still useful and the events
       // after that can be replayed after that snapshot, but replaying the 
events after toSequenceNr without
       // starting at the snapshot at toSequenceNr would be invalid.
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(2)
 
       // one at a time since snapshotting+event-deletion switches to running 
state before deleting snapshot so ordering
       // if sending many commands in one go is not deterministic
@@ -339,7 +338,7 @@ class EventSourcedBehaviorRetentionSpec
       persistentActor ! Increment // 12
       snapshotSignalProbe.expectSnapshotCompleted(12)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 6
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
 
       persistentActor ! Increment // 13
       persistentActor ! Increment // 14
@@ -347,7 +346,7 @@ class EventSourcedBehaviorRetentionSpec
       persistentActor ! Increment // 15
       snapshotSignalProbe.expectSnapshotCompleted(15)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 9
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(8)
 
       persistentActor ! Increment // 16
       persistentActor ! Increment // 17
@@ -355,7 +354,7 @@ class EventSourcedBehaviorRetentionSpec
       snapshotSignalProbe.expectSnapshotCompleted(18)
 
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 12
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(11)
 
       eventProbe.expectNoMessage()
       snapshotSignalProbe.expectNoMessage()
@@ -381,7 +380,7 @@ class EventSourcedBehaviorRetentionSpec
       (4 to 10).foreach(_ => persistentActor ! Increment)
       snapshotSignalProbe.expectSnapshotCompleted(5)
       snapshotSignalProbe.expectSnapshotCompleted(10)
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
 
       (11 to 13).foreach(_ => persistentActor ! Increment)
       snapshotSignalProbe.expectSnapshotCompleted(13)
@@ -395,7 +394,7 @@ class EventSourcedBehaviorRetentionSpec
       persistentActor ! GetValue(replyProbe.ref)
       replyProbe.expectMessage(State(16, (0 until 16).toVector))
       snapshotSignalProbe.expectSnapshotCompleted(15)
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(10)
       eventProbe.within(3.seconds) {
         eventProbe.expectNoMessage()
         snapshotSignalProbe.expectNoMessage()
@@ -438,18 +437,18 @@ class EventSourcedBehaviorRetentionSpec
       snapshotSignalProbe.expectSnapshotCompleted(8) // every-2 through 
criteria
       // triggers delete up to snapshot no 2
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 2
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete 
oldest snapshot
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(1) // then delete 
oldest snapshot
 
       persistentActor ! Increment // 9
       persistentActor ! Increment // 10
       snapshotSignalProbe.expectSnapshotCompleted(10) // every-2 through 
criteria
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(3)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 4
 
       persistentActor ! Increment // 11
       persistentActor ! Increment // 12
       snapshotSignalProbe.expectSnapshotCompleted(12) // every-2 through 
criteria
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 6
 
       persistentActor ! Increment // 13
@@ -463,13 +462,13 @@ class EventSourcedBehaviorRetentionSpec
       persistentActor ! Increment // 14
       snapshotSignalProbe.expectSnapshotCompleted(14) // every-2 through 
criteria
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 8
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(7)
 
       persistentActor ! Increment // 15
       persistentActor ! Increment // 16
       snapshotSignalProbe.expectSnapshotCompleted(16) // every-2 through 
criteria
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 10
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(9)
 
       eventProbe.within(3.seconds) {
         eventProbe.expectNoMessage()
@@ -501,31 +500,31 @@ class EventSourcedBehaviorRetentionSpec
       snapshotSignalProbe.expectSnapshotCompleted(2)
       snapshotSignalProbe.expectSnapshotCompleted(3)
       snapshotSignalProbe.expectSnapshotCompleted(4)
-      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
+      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1)
 
       persistentActor ! Increment
       snapshotSignalProbe.expectSnapshotCompleted(5)
-      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
+      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2)
 
       persistentActor ! Increment
       snapshotSignalProbe.expectSnapshotCompleted(6)
-      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3)
 
       persistentActor ! Increment
       snapshotSignalProbe.expectSnapshotCompleted(7)
-      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
+      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4)
 
       persistentActor ! Increment
       snapshotSignalProbe.expectSnapshotCompleted(8)
-      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
+      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5)
 
       persistentActor ! Increment
       snapshotSignalProbe.expectSnapshotCompleted(9)
-      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
+      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6)
 
       persistentActor ! Increment
       snapshotSignalProbe.expectSnapshotCompleted(10)
-      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4)
+      deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7)
 
       persistentActor ! GetValue(replyProbe.ref)
       replyProbe.expectMessage(State(10, (0 until 10).toVector))
@@ -554,32 +553,32 @@ class EventSourcedBehaviorRetentionSpec
       persistentActor ! Increment // 5
       snapshotSignalProbe.expectSnapshotCompleted(5)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 2
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(1)
 
       persistentActor ! Increment // 6
       snapshotSignalProbe.expectSnapshotCompleted(6)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 3
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(2)
 
       persistentActor ! Increment // 7
       snapshotSignalProbe.expectSnapshotCompleted(7)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 4
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(3)
 
       persistentActor ! Increment // 8
       snapshotSignalProbe.expectSnapshotCompleted(8)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 5
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(4)
 
       persistentActor ! Increment // 9
       snapshotSignalProbe.expectSnapshotCompleted(9)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 6
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
 
       persistentActor ! Increment // 10
       snapshotSignalProbe.expectSnapshotCompleted(10)
       
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr 
shouldEqual 7
-      snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
+      snapshotSignalProbe.expectDeleteSnapshotCompleted(6)
     }
 
     "snapshot on recovery if expected snapshot is missing" in {
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
index 6124e83264..ded56b0bf5 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
@@ -87,7 +87,8 @@ class EventSourcedBehaviorWatchSpec
       stashState = new 
StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings),
       replication = None,
       publishEvents = false,
-      internalLoggerFactory = () => logger)
+      internalLoggerFactory = () => logger,
+      retentionInProgress = false)
 
   "A typed persistent parent actor watching a child" must {
 
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
index 6e80c2b75e..38b94166a0 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
@@ -22,6 +22,7 @@ import pekko.actor.{ ActorRef => ClassicActorRef }
 import pekko.actor.Cancellable
 import pekko.actor.typed.Signal
 import pekko.actor.typed.scaladsl.ActorContext
+import pekko.actor.typed.scaladsl.LoggerOps
 import pekko.annotation.InternalApi
 import pekko.persistence._
 import pekko.persistence.typed.EventAdapter
@@ -71,7 +72,8 @@ private[pekko] final class BehaviorSetup[C, E, S](
     val stashState: StashState,
     val replication: Option[ReplicationSetup],
     val publishEvents: Boolean,
-    private val internalLoggerFactory: () => Logger) {
+    private val internalLoggerFactory: () => Logger,
+    private var retentionInProgress: Boolean) {
 
   import BehaviorSetup._
   import InternalProtocol.RecoveryTickEvent
@@ -197,6 +199,110 @@ private[pekko] final class BehaviorSetup[C, E, S](
     }
   }
 
+  // The retention process for SnapshotCountRetentionCriteria looks like this:
+  // 1. Save snapshot after persisting events when shouldSnapshotAfterPersist 
returned SnapshotWithRetention.
+  // 2. Delete events (when deleteEventsOnSnapshot=true), runs in background.
+  // 3. Delete snapshots (when isOnlyOneSnapshot=false), runs in background.
+
+  def isRetentionInProgress(): Boolean =
+    retentionInProgress
+
+  def retentionProgressSaveSnapshotStarted(sequenceNr: Long): Unit = {
+    retention match {
+      case SnapshotCountRetentionCriteriaImpl(_, _, _) =>
+        internalLogger.debug("Starting retention at seqNr [{}], saving 
snapshot.", sequenceNr)
+        retentionInProgress = true
+      case _ =>
+    }
+  }
+
+  def retentionProgressSaveSnapshotEnded(sequenceNr: Long, success: Boolean): 
Unit = {
+    retention match {
+      case SnapshotCountRetentionCriteriaImpl(_, _, deleteEvents) if 
retentionInProgress =>
+        if (!success) {
+          internalLogger.debug("Retention at seqNr [{}] is completed, saving 
snapshot failed.", sequenceNr)
+          retentionInProgress = false
+        } else if (deleteEvents) {
+          internalLogger.debug("Retention at seqNr [{}], saving snapshot was 
successful.", sequenceNr)
+        } else if (isOnlyOneSnapshot) {
+          // no delete of events and no delete of snapshots => done
+          internalLogger.debug("Retention at seqNr [{}] is completed, saving 
snapshot was successful.", sequenceNr)
+          retentionInProgress = false
+        } else {
+          internalLogger.debug("Retention at seqNr [{}], saving snapshot was 
successful.", sequenceNr)
+        }
+      case _ =>
+    }
+  }
+
+  def retentionProgressDeleteEventsStarted(sequenceNr: Long, 
deleteToSequenceNr: Long): Unit = {
+    retention match {
+      case SnapshotCountRetentionCriteriaImpl(_, _, true) if 
retentionInProgress =>
+        if (deleteToSequenceNr > 0) {
+          internalLogger.debug2(
+            "Retention at seqNr [{}], deleting events to seqNr [{}].",
+            sequenceNr,
+            deleteToSequenceNr)
+        } else {
+          internalLogger.debug("Retention is completed, no events to delete.")
+          retentionInProgress = false
+        }
+      case _ =>
+    }
+  }
+
+  def retentionProgressDeleteEventsEnded(deleteToSequenceNr: Long, success: 
Boolean): Unit = {
+    retention match {
+      case SnapshotCountRetentionCriteriaImpl(_, _, true) if 
retentionInProgress =>
+        if (!success) {
+          internalLogger.debug(
+            "Retention is completed, deleting events to seqNr [{}] failed.",
+            deleteToSequenceNr)
+          retentionInProgress = false
+        } else if (isOnlyOneSnapshot) {
+          // no delete of snapshots => done
+          internalLogger.debug(
+            "Retention is completed, deleting events to seqNr [{}] was 
successful.",
+            deleteToSequenceNr)
+          retentionInProgress = false
+        } else {
+          internalLogger.debug("Retention, deleting events to seqNr [{}] was 
successful.", deleteToSequenceNr)
+        }
+      case _ =>
+    }
+  }
+
+  def retentionProgressDeleteSnapshotsStarted(deleteToSequenceNr: Long): Unit 
= {
+    retention match {
+      case SnapshotCountRetentionCriteriaImpl(_, _, _) if retentionInProgress 
=>
+        if (deleteToSequenceNr > 0) {
+          internalLogger.debug("Retention, deleting snapshots to seqNr [{}].", 
deleteToSequenceNr)
+        } else {
+          internalLogger.debug("Retention is completed, no snapshots to 
delete.")
+          retentionInProgress = false
+        }
+      case _ =>
+    }
+  }
+
+  def retentionProgressDeleteSnapshotsEnded(deleteToSequenceNr: Long, success: 
Boolean): Unit = {
+    retention match {
+      case SnapshotCountRetentionCriteriaImpl(_, _, _) if retentionInProgress 
=>
+        if (success) {
+          // delete snapshot is last step => done
+          internalLogger.debug(
+            "Retention is completed, deleting snapshots to seqNr [{}] was 
successful.",
+            deleteToSequenceNr)
+          retentionInProgress = false
+        } else {
+          internalLogger.debug("Retention is completed, deleting snapshots to 
seqNr [{}] failed.", deleteToSequenceNr)
+          retentionInProgress = false
+        }
+
+      case _ =>
+    }
+  }
+
 }
 
 /**
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
index 0aa0df8bed..75ebbb1b43 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
@@ -209,7 +209,8 @@ private[pekko] final case class 
EventSourcedBehaviorImpl[Command, Event, State](
             stashState = stashState,
             replication = replication,
             publishEvents = publishEvents,
-            internalLoggerFactory = () => internalLogger())
+            internalLoggerFactory = () => internalLogger(),
+            retentionInProgress = false)
 
           // needs to accept Any since we also can get messages from the 
journal
           // not part of the user facing Command protocol
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
index 417af3ddad..f56411ad5e 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
@@ -223,11 +223,15 @@ private[pekko] trait SnapshotInteractions[C, E, S] {
     }
   }
 
-  /** Deletes the snapshots up to and including the `sequenceNr`. */
-  protected def internalDeleteSnapshots(fromSequenceNr: Long, toSequenceNr: 
Long): Unit = {
+  /**
+   * Deletes the snapshots up to and including the `sequenceNr`.
+   * Uses `minSequenceNr = 0L` to always delete from the beginning, which 
simplifies
+   * the retention bookkeeping by removing the need to track a separate lower 
bound.
+   */
+  protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = {
     if (toSequenceNr > 0) {
-      val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = 
fromSequenceNr, maxSequenceNr = toSequenceNr)
-      setup.internalLogger.debug2("Deleting snapshots from sequenceNr [{}] to 
[{}]", fromSequenceNr, toSequenceNr)
+      val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = 0L, 
maxSequenceNr = toSequenceNr)
+      setup.internalLogger.debug("Deleting snapshots to sequenceNr [{}]", 
toSequenceNr)
       setup.snapshotStore
         .tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id, 
snapshotCriteria), setup.selfClassic)
     }
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
index 0bc9a57b10..a07935e432 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
@@ -44,16 +44,6 @@ import pekko.persistence.typed.scaladsl
     math.max(0, lastSequenceNr - (keepNSnapshots.toLong * 
snapshotEveryNEvents))
   }
 
-  /**
-   * Should only be used when `BehaviorSetup.isOnlyOneSnapshot` is false.
-   */
-  def deleteLowerSequenceNr(upperSequenceNr: Long): Long = {
-    // We could use 0 as fromSequenceNr to delete all older snapshots, but 
that might be inefficient for
-    // large ranges depending on how it's implemented in the snapshot plugin. 
Therefore we use the
-    // same window as defined for how much to keep in the retention criteria
-    math.max(0, upperSequenceNr - (keepNSnapshots.toLong * 
snapshotEveryNEvents))
-  }
-
   override def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteriaImpl =
     copy(deleteEventsOnSnapshot = true)
 
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index bb983d05a2..a43eb278b1 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -813,13 +813,27 @@ private[pekko] object Running {
           this
         } else {
           visibleState = state
-          if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null) 
{
+          def skipRetention(): Boolean = {
+            // Only one retention process (snapshot + optional event/snapshot 
deletion) at a time.
+            // When retention is already in progress, both the snapshot and 
the subsequent
+            // deletion steps are skipped together. This keeps retention state 
simple and avoids
+            // transiently exceeding the intended snapshot count. The next 
retention cycle at a
+            // higher seqNr will cover the range of the skipped one, so no 
data is lost.
+            val inProgress = shouldSnapshotAfterPersist == 
SnapshotWithRetention && setup.isRetentionInProgress()
+            if (inProgress)
+              setup.internalLogger.debug(
+                "Skipping retention at seqNr [{}] because previous retention 
has not completed yet. " +
+                "Next retention will cover skipped retention.",
+                state.seqNr)
+            inProgress
+          }
+          if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null 
|| skipRetention()) {
             val newState = applySideEffects(sideEffects, state)
-
             onWriteDone(setup.context, p)
-
             tryUnstashOne(newState)
           } else {
+            if (shouldSnapshotAfterPersist == SnapshotWithRetention)
+              setup.retentionProgressSaveSnapshotStarted(state.seqNr)
             internalSaveSnapshot(state)
             new StoringSnapshot(state, sideEffects, shouldSnapshotAfterPersist)
           }
@@ -910,18 +924,22 @@ private[pekko] object Running {
             setup.retention match {
               case DisabledRetentionCriteria                          => // no 
further actions
               case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) =>
+                setup.retentionProgressSaveSnapshotEnded(state.seqNr, success 
= true)
                 // deleteEventsOnSnapshot == true, deletion of old events
                 val deleteEventsToSeqNr = {
                   if (setup.isOnlyOneSnapshot) meta.sequenceNr // delete all 
events up to the snapshot
                   else s.deleteUpperSequenceNr(meta.sequenceNr) // 
keepNSnapshots batches of events
                 }
                 // snapshot deletion then happens on event deletion success in 
Running.onDeleteEventsJournalResponse
+                setup.retentionProgressDeleteEventsStarted(state.seqNr, 
deleteEventsToSeqNr)
                 internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr)
               case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) =>
+                setup.retentionProgressSaveSnapshotEnded(state.seqNr, success 
= true)
                 // deleteEventsOnSnapshot == false, deletion of old snapshots
                 if (!setup.isOnlyOneSnapshot) {
                   val deleteSnapshotsToSeqNr = 
s.deleteUpperSequenceNr(meta.sequenceNr)
-                  
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr), 
deleteSnapshotsToSeqNr)
+                  
setup.retentionProgressDeleteSnapshotsStarted(deleteSnapshotsToSeqNr)
+                  internalDeleteSnapshots(deleteSnapshotsToSeqNr)
                 }
               case unexpected => throw new IllegalStateException(s"Unexpected 
retention criteria: $unexpected")
             }
@@ -930,6 +948,8 @@ private[pekko] object Running {
           Some(SnapshotCompleted(SnapshotMetadata.fromClassic(meta)))
 
         case SaveSnapshotFailure(meta, error) =>
+          if (snapshotReason == SnapshotWithRetention)
+            setup.retentionProgressSaveSnapshotEnded(state.seqNr, success = 
false)
           setup.internalLogger.warn2("Failed to save snapshot given metadata 
[{}] due to: {}", meta, error.getMessage)
           Some(SnapshotFailed(SnapshotMetadata.fromClassic(meta), error))
 
@@ -1029,20 +1049,23 @@ private[pekko] object Running {
     val signal = response match {
       case DeleteMessagesSuccess(toSequenceNr) =>
         setup.internalLogger.debug("Persistent events to sequenceNr [{}] 
deleted successfully.", toSequenceNr)
+        setup.retentionProgressDeleteEventsEnded(toSequenceNr, success = true)
         setup.retention match {
           case DisabledRetentionCriteria             => // no further actions
-          case s: SnapshotCountRetentionCriteriaImpl =>
+          case _: SnapshotCountRetentionCriteriaImpl =>
             if (!setup.isOnlyOneSnapshot) {
               // The reason for -1 is that a snapshot at the exact 
toSequenceNr is still useful and the events
               // after that can be replayed after that snapshot, but replaying 
the events after toSequenceNr without
               // starting at the snapshot at toSequenceNr would be invalid.
               val deleteSnapshotsToSeqNr = toSequenceNr - 1
-              
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr), 
deleteSnapshotsToSeqNr)
+              
setup.retentionProgressDeleteSnapshotsStarted(deleteSnapshotsToSeqNr)
+              internalDeleteSnapshots(deleteSnapshotsToSeqNr)
             }
           case unexpected => throw new IllegalStateException(s"Unexpected 
retention criteria: $unexpected")
         }
         Some(DeleteEventsCompleted(toSequenceNr))
       case DeleteMessagesFailure(e, toSequenceNr) =>
+        setup.retentionProgressDeleteEventsEnded(toSequenceNr, success = false)
         Some(DeleteEventsFailed(toSequenceNr, e))
       case _ =>
         None
@@ -1063,8 +1086,10 @@ private[pekko] object Running {
   def onDeleteSnapshotResponse(response: SnapshotProtocol.Response, state: S): 
Behavior[InternalProtocol] = {
     val signal = response match {
       case DeleteSnapshotsSuccess(criteria) =>
+        setup.retentionProgressDeleteSnapshotsEnded(criteria.maxSequenceNr, 
success = true)
         
Some(DeleteSnapshotsCompleted(DeletionTarget.Criteria(SnapshotSelectionCriteria.fromClassic(criteria))))
       case DeleteSnapshotsFailure(criteria, error) =>
+        setup.retentionProgressDeleteSnapshotsEnded(criteria.maxSequenceNr, 
success = false)
         
Some(DeleteSnapshotsFailed(DeletionTarget.Criteria(SnapshotSelectionCriteria.fromClassic(criteria)),
 error))
       case DeleteSnapshotSuccess(meta) =>
         
Some(DeleteSnapshotsCompleted(DeletionTarget.Individual(SnapshotMetadata.fromClassic(meta))))
diff --git 
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala
 
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala
index d8a903ab4e..93207a459d 100644
--- 
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala
+++ 
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala
@@ -39,23 +39,22 @@ class RetentionCriteriaSpec extends TestSuite with Matchers 
with AnyWordSpecLike
     "have valid sequenceNr range based on keepNSnapshots" in {
       val criteria = RetentionCriteria.snapshotEvery(3, 
2).asInstanceOf[SnapshotCountRetentionCriteriaImpl]
       val expected = List(
-        1 -> (0 -> 0),
-        3 -> (0 -> 0),
-        4 -> (0 -> 0),
-        6 -> (0 -> 0),
-        7 -> (0 -> 1),
-        9 -> (0 -> 3),
-        10 -> (0 -> 4),
-        12 -> (0 -> 6),
-        13 -> (1 -> 7),
-        15 -> (3 -> 9),
-        18 -> (6 -> 12),
-        20 -> (8 -> 14))
+        1 -> 0,
+        3 -> 0,
+        4 -> 0,
+        6 -> 0,
+        7 -> 1,
+        9 -> 3,
+        10 -> 4,
+        12 -> 6,
+        13 -> 7,
+        15 -> 9,
+        18 -> 12,
+        20 -> 14)
       expected.foreach {
-        case (seqNr, (lower, upper)) =>
+        case (seqNr, upper) =>
           withClue(s"seqNr=$seqNr:") {
             criteria.deleteUpperSequenceNr(seqNr) should ===(upper)
-            criteria.deleteLowerSequenceNr(upper) should ===(lower)
           }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to