This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 0606b921c7 Harden EventSourcedBehaviorRetentionSpec (#2794)
0606b921c7 is described below
commit 0606b921c714fc86475fbee50b7259b815081e97
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Mar 28 15:41:05 2026 +0800
Harden EventSourcedBehaviorRetentionSpec (#2794)
Send increments one at a time instead of in bulk to avoid
non-deterministic signal ordering between snapshot and delete-snapshot
background tasks. Use separate probes for snapshot vs delete signals.
Upstream: akka/akka-core@cb7e11de49
Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
Co-authored-by: Copilot <[email protected]>
---
.../EventSourcedBehaviorRetentionSpec.scala | 35 +++++++++++++++-------
1 file changed, 25 insertions(+), 10 deletions(-)
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
index 20c20f2279..cde99abe1b 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
@@ -473,39 +473,54 @@ class EventSourcedBehaviorRetentionSpec
// very bad idea to snapshot every event, but technically possible
val pid = nextPid()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
+ val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
- counter(ctx, pid, snapshotSignalProbe =
Some(snapshotSignalProbe.ref))
+ counter(ctx, pid,
+ snapshotSignalProbe = Some(snapshotSignalProbe.ref),
+ deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1,
keepNSnapshots = 3))))
- (1 to 10).foreach(_ => persistentActor ! Increment)
+ // Send one at a time: the order of snapshot/delete signals is not
guaranteed
+ // because snapshots and deletes are background tasks, and there are
several
+ // future composition steps along the way that may reorder those signals
+ (1 to 4).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
- replyProbe.expectMessage(State(10, (0 until 10).toVector))
+ replyProbe.expectMessage(State(4, (0 until 4).toVector))
snapshotSignalProbe.expectSnapshotCompleted(1)
snapshotSignalProbe.expectSnapshotCompleted(2)
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(4)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
+ persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(5)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
+ persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(6)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
+ persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(7)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
+ persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(8)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
+ persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(9)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
+ persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(10)
- snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4)
+ deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4)
+
+ persistentActor ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(10, (0 until 10).toVector))
}
"be possible to snapshot every event withDeleteEventsOnSnapshot" in {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]