This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 523c05f7f6 feat: Add unpersistent versions of persistent behaviors
(#2456)
523c05f7f6 is described below
commit 523c05f7f6b9ab7cc8b4e91e80b877055221e405
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Nov 9 19:48:33 2025 +0800
feat: Add unpersistent versions of persistent behaviors (#2456)
---
.../typed/AccountExampleUnpersistentDocTest.java | 161 +++++++
.../typed/AccountExampleUnpersistentDocSpec.scala | 97 ++++
docs/src/main/paradox/typed/persistence-testing.md | 44 +-
docs/src/main/paradox/typed/testing-sync.md | 2 +-
.../testkit/internal/Unpersistent.scala | 488 +++++++++++++++++++++
.../testkit/javadsl/UnpersistentBehavior.scala | 162 +++++++
.../testkit/scaladsl/UnpersistentBehavior.scala | 144 ++++++
.../scaladsl/UnpersistentDurableStateSpec.scala | 264 +++++++++++
.../scaladsl/UnpersistentEventSourcedSpec.scala | 337 ++++++++++++++
9 files changed, 1696 insertions(+), 3 deletions(-)
diff --git
a/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
new file mode 100644
index 0000000000..759408f65e
--- /dev/null
+++
b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.org.apache.pekko.cluster.sharding.typed;
+
+import org.apache.pekko.Done;
+import org.scalatestplus.junit.JUnitSuite;
+
+import static
jdocs.org.apache.pekko.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
+import static org.junit.Assert.*;
+
+// #test
+import java.math.BigDecimal;
+
+import org.apache.pekko.actor.testkit.typed.javadsl.BehaviorTestKit;
+import org.apache.pekko.actor.testkit.typed.javadsl.ReplyInbox;
+import org.apache.pekko.actor.testkit.typed.javadsl.StatusReplyInbox;
+import org.apache.pekko.persistence.testkit.javadsl.UnpersistentBehavior;
+import org.apache.pekko.persistence.testkit.javadsl.PersistenceEffect;
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.junit.Test;
+
+public class AccountExampleUnpersistentDocTest
+ // #test
+ extends JUnitSuite
+// #test
+{
+ @Test
+ public void createWithEmptyBalance() {
+ UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event,
AccountEntity.Account>
+ unpersistent = emptyAccount();
+
+ BehaviorTestKit<AccountEntity.Command> testkit =
unpersistent.getBehaviorTestKit();
+
+ StatusReplyInbox<Done> ackInbox =
testkit.runAskWithStatus(AccountEntity.CreateAccount::new);
+
+ ackInbox.expectValue(Done.getInstance());
+
unpersistent.getEventProbe().expectPersisted(AccountEntity.AccountCreated.INSTANCE);
+
+ // internal state is only exposed by the behavior via responses to
messages or if it happens
+ // to snapshot. This particular behavior never snapshots, so we
query within the actor's
+ // protocol
+ assertFalse(unpersistent.getSnapshotProbe().hasEffects());
+
+ ReplyInbox<AccountEntity.CurrentBalance> currentBalanceInbox =
+ testkit.runAsk(AccountEntity.GetBalance::new);
+
+ assertEquals(BigDecimal.ZERO,
currentBalanceInbox.receiveReply().balance);
+ }
+
+ @Test
+ public void handleDepositAndWithdraw() {
+ UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event,
AccountEntity.Account>
+ unpersistent = openedAccount();
+
+ BehaviorTestKit<AccountEntity.Command> testkit =
unpersistent.getBehaviorTestKit();
+ BigDecimal currentBalance;
+
+ testkit
+ .runAskWithStatus(
+ Done.class, replyTo -> new
AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo))
+ .expectValue(Done.getInstance());
+
+ assertEquals(
+ BigDecimal.valueOf(100),
+ unpersistent
+ .getEventProbe()
+ .expectPersistedClass(AccountEntity.Deposited.class)
+ .persistedObject()
+ .amount);
+
+ currentBalance =
+ testkit
+ .runAsk(AccountEntity.CurrentBalance.class,
AccountEntity.GetBalance::new)
+ .receiveReply()
+ .balance;
+
+ assertEquals(BigDecimal.valueOf(100), currentBalance);
+
+ testkit
+ .runAskWithStatus(
+ Done.class, replyTo -> new
AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo))
+ .expectValue(Done.getInstance());
+
+ // can save the persistence effect for in-depth inspection
+ PersistenceEffect<AccountEntity.Withdrawn> withdrawEffect =
+
unpersistent.getEventProbe().expectPersistedClass(AccountEntity.Withdrawn.class);
+ assertEquals(BigDecimal.valueOf(10),
withdrawEffect.persistedObject().amount);
+ assertEquals(3L, withdrawEffect.sequenceNr());
+ assertTrue(withdrawEffect.tags().isEmpty());
+
+ currentBalance =
+ testkit
+ .runAsk(AccountEntity.CurrentBalance.class,
AccountEntity.GetBalance::new)
+ .receiveReply()
+ .balance;
+
+ assertEquals(BigDecimal.valueOf(90), currentBalance);
+ }
+
+ @Test
+ public void rejectWithdrawOverdraft() {
+ UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event,
AccountEntity.Account>
+ unpersistent = accountWithBalance(BigDecimal.valueOf(100));
+
+ BehaviorTestKit<AccountEntity.Command> testkit =
unpersistent.getBehaviorTestKit();
+
+ testkit
+ .runAskWithStatus(
+ Done.class, replyTo -> new
AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo))
+ .expectErrorMessage("not enough funds to withdraw 110");
+
+ assertFalse(unpersistent.getEventProbe().hasEffects());
+ }
+
+ // #test
+ private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event,
AccountEntity.Account>
+ emptyAccount() {
+ return
+ // #unpersistent-behavior
+ UnpersistentBehavior.fromEventSourced(
+ AccountEntity.create("1", PersistenceId.of("Account", "1")),
+ null, // use the initial state
+ 0 // initial sequence number
+ );
+ // #unpersistent-behavior
+ }
+
+ private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event,
AccountEntity.Account>
+ openedAccount() {
+ return
+ // #unpersistent-behavior-provided-state
+ UnpersistentBehavior.fromEventSourced(
+ AccountEntity.create("1", PersistenceId.of("Account", "1")),
+ new AccountEntity.EmptyAccount()
+ .openedAccount(), // duplicate the event handler for
AccountCreated on an EmptyAccount
+ 1 // assume that CreateAccount was the first command
+ );
+ // #unpersistent-behavior-provided-state
+ }
+
+ private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event,
AccountEntity.Account>
+ accountWithBalance(BigDecimal balance) {
+ return UnpersistentBehavior.fromEventSourced(
+ AccountEntity.create("1", PersistenceId.of("Account", "1")),
+ new AccountEntity.OpenedAccount(balance),
+ 2);
+ }
+ // #test
+}
+// #test
diff --git
a/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
b/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
new file mode 100644
index 0000000000..865e936a50
--- /dev/null
+++
b/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.org.apache.pekko.cluster.sharding.typed
+
+import org.apache.pekko
+import pekko.Done
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+// #test
+import pekko.persistence.testkit.scaladsl.UnpersistentBehavior
+import pekko.persistence.typed.PersistenceId
+
+class AccountExampleUnpersistentDocSpec
+ extends AnyWordSpecLike
+ // #test
+ with Matchers
+ // #test
+ {
+ // #test
+ import AccountExampleWithEventHandlersInState.AccountEntity
+ // #test
+ "Account" must {
+ "be created with zero balance" in {
+ onAnEmptyAccount { (testkit, eventProbe, snapshotProbe) =>
+
testkit.runAskWithStatus[Done](AccountEntity.CreateAccount(_)).expectDone()
+
+ eventProbe.expectPersisted(AccountEntity.AccountCreated)
+
+ // internal state is only exposed by the behavior via responses to
messages or if it happens
+ // to snapshot. This particular behavior never snapshots, so we
query within the actor's
+ // protocol
+ snapshotProbe.hasEffects shouldBe false
+
+
testkit.runAsk[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)).receiveReply().balance
shouldBe 0
+ }
+ }
+
+ "handle Deposit and Withdraw" in {
+ onAnOpenedAccount { (testkit, eventProbe, _) =>
+ testkit.runAskWithStatus[Done](AccountEntity.Deposit(100,
_)).expectDone()
+
+ eventProbe.expectPersisted(AccountEntity.Deposited(100))
+
+ testkit.runAskWithStatus[Done](AccountEntity.Withdraw(10,
_)).expectDone()
+
+ eventProbe.expectPersisted(AccountEntity.Withdrawn(10))
+
+
testkit.runAsk[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)).receiveReply().balance
shouldBe 90
+ }
+ }
+
+ "reject Withdraw overdraft" in {
+ onAnAccountWithBalance(100) { (testkit, eventProbe, _) =>
+ testkit.runAskWithStatus(AccountEntity.Withdraw(110,
_)).receiveStatusReply().isError shouldBe true
+
+ eventProbe.hasEffects shouldBe false
+ }
+ }
+ }
+ // #test
+
+ // #unpersistent-behavior
+ private def onAnEmptyAccount
+ : UnpersistentBehavior.EventSourced[AccountEntity.Command,
AccountEntity.Event, AccountEntity.Account] =
+ UnpersistentBehavior.fromEventSourced(AccountEntity("1",
PersistenceId("Account", "1")))
+ // #unpersistent-behavior
+
+ // #unpersistent-behavior-provided-state
+ private def onAnOpenedAccount
+ : UnpersistentBehavior.EventSourced[AccountEntity.Command,
AccountEntity.Event, AccountEntity.Account] =
+ UnpersistentBehavior.fromEventSourced(
+ AccountEntity("1", PersistenceId("Account", "1")),
+ Some(
+ AccountEntity.EmptyAccount.applyEvent(AccountEntity.AccountCreated) ->
// reuse the event handler
+ 1L // assume that CreateAccount was the first command
+ ))
+ // #unpersistent-behavior-provided-state
+
+ private def onAnAccountWithBalance(balance: BigDecimal) =
+ UnpersistentBehavior.fromEventSourced(
+ AccountEntity("1", PersistenceId("Account", "1")),
+ Some(AccountEntity.OpenedAccount(balance) -> 2L))
+ // #test
+}
+// #test
diff --git a/docs/src/main/paradox/typed/persistence-testing.md
b/docs/src/main/paradox/typed/persistence-testing.md
index 46d828c153..12eea0336f 100644
--- a/docs/src/main/paradox/typed/persistence-testing.md
+++ b/docs/src/main/paradox/typed/persistence-testing.md
@@ -19,9 +19,49 @@ To use Pekko Persistence TestKit, add the module to your
project:
@@project-info{ projectId="persistence-testkit" }
-## Unit testing
+## Unit testing with the BehaviorTestKit
-**Note!** The `EventSourcedBehaviorTestKit` is a new feature, api may have
changes breaking source compatibility in future versions.
+**Note!** The `UnpersistentBehavior` is a new feature: the API may have
changes breaking source compatibility in future versions.
+
+Unit testing of `EventSourcedBehavior` can be performed by converting it into
an @apidoc[UnpersistentBehavior]. Instead of
+persisting events and snapshots, the `UnpersistentBehavior` exposes
@apidoc[PersistenceProbe]s for events and snapshots which
+can be asserted on.
+
+Scala
+: @@snip
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
{ #unpersistent-behavior }
+
+Java
+: @@snip
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
{ #unpersistent-behavior }
+
+The `UnpersistentBehavior` can be initialized with arbitrary states:
+
+Scala
+: @@snip
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
{ #unpersistent-behavior-provided-state }
+
+Java
+: @@snip
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
{ #unpersistent-behavior-provided-state }
+
+The `UnpersistentBehavior` is especially well-suited to the synchronous
@ref:[`BehaviorTestKit`](testing-sync.md#synchronous-behavior-testing):
+the `UnpersistentBehavior` can directly construct a `BehaviorTestKit` wrapping
the behavior. When commands are run by `BehaviorTestKit`,
+they are processed in the calling thread (viz. the test suite), so when the
run returns, the suite can be sure that the message has been
+fully processed. The internal state of the `EventSourcedBehavior` is not
exposed to the suite except to the extent that it affects how
+the behavior responds to commands or the events it persists (in addition, any
snapshots made by the behavior are available through a
+`PersistenceProbe`).
+
+A full test for the `AccountEntity`, which is shown in the @ref:[Persistence
Style Guide](persistence-style.md) might look like:
+
+Scala
+: @@snip
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
{ #test }
+
+Java
+: @@snip
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
{ #test }
+
+`UnpersistentBehavior` does not require any configuration. It therefore does
not verify the serialization of commands, events, or state.
+If using this style, it is advised to independently test serialization for
those classes.
+
+## Unit testing with the the ActorTestKit and EventSourcedBehaviorTestKit
+
+**Note!** The `EventSourcedBehaviorTestKit` is a new feature: the API may have
changes breaking source compatibility in future versions.
Unit testing of `EventSourcedBehavior` can be done with the
@apidoc[EventSourcedBehaviorTestKit]. It supports running
one command at a time and you can assert that the synchronously returned
result is as expected. The result contains the
diff --git a/docs/src/main/paradox/typed/testing-sync.md
b/docs/src/main/paradox/typed/testing-sync.md
index 3e9ab222f7..a96828e66c 100644
--- a/docs/src/main/paradox/typed/testing-sync.md
+++ b/docs/src/main/paradox/typed/testing-sync.md
@@ -12,7 +12,7 @@ limitations:
* Spawning of @scala[`Future`]@java[`CompletionStage`] or other asynchronous
task and you rely on a callback to
complete before observing the effect you want to test.
* Usage of scheduler is not supported.
-* `EventSourcedBehavior` can't be tested.
+* `EventSourcedBehavior` can't be fully tested, but it is possible to
@ref:[test the core
functionality](persistence-testing.md#unit-testing-with-the-behaviortestkit)
* Interactions with other actors must be stubbed.
* Blackbox testing style.
* Supervision is not supported.
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
new file mode 100644
index 0000000000..18204b4132
--- /dev/null
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.internal
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.annotation.tailrec
+import scala.collection.immutable
+import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+import org.apache.pekko
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.internal.BehaviorImpl.DeferredBehavior
+import pekko.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
+import pekko.annotation.InternalApi
+import pekko.persistence.testkit.{ javadsl, scaladsl }
+import pekko.persistence.typed.internal.EventSourcedBehaviorImpl
+import pekko.persistence.typed.internal.Running.WithSeqNrAccessible
+import pekko.persistence.typed.state.internal.DurableStateBehaviorImpl
+import pekko.persistence.typed.state.internal.Running.WithRevisionAccessible
+import pekko.util.ConstantFun.{ scalaAnyToUnit => doNothing }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] object Unpersistent {
+
+ def eventSourced[Command, Event, State](behavior: Behavior[Command],
fromStateAndSequenceNr: Option[(State, Long)])(
+ onEvent: (Event, Long, Set[String]) => Unit)(onSnapshot: (State, Long)
=> Unit): Behavior[Command] = {
+ @tailrec
+ def findEventSourcedBehavior(
+ b: Behavior[Command],
+ context: ActorContext[Command]):
Option[EventSourcedBehaviorImpl[Command, Event, State]] = {
+ b match {
+ case es: EventSourcedBehaviorImpl[Command, _, _] =>
+ Some(es.asInstanceOf[EventSourcedBehaviorImpl[Command, Event,
State]])
+
+ case deferred: DeferredBehavior[Command] =>
+ findEventSourcedBehavior(deferred(context), context)
+
+ case _ => None
+ }
+ }
+
+ Behaviors.setup[Command] { context =>
+ findEventSourcedBehavior(behavior, context).fold {
+ throw new AssertionError("Did not find the expected
EventSourcedBehavior")
+ } { esBehavior =>
+ val (initialState, initialSequenceNr) =
fromStateAndSequenceNr.getOrElse(esBehavior.emptyState -> 0L)
+ new WrappedEventSourcedBehavior(context, esBehavior, initialState,
initialSequenceNr, onEvent, onSnapshot)
+ }
+ }
+ }
+
+ def durableState[Command, State](behavior: Behavior[Command], fromState:
Option[State])(
+ onPersist: (State, Long, String) => Unit): Behavior[Command] = {
+
+ @tailrec
+ def findDurableStateBehavior(
+ b: Behavior[Command],
+ context: ActorContext[Command]):
Option[DurableStateBehaviorImpl[Command, State]] =
+ b match {
+ case ds: DurableStateBehaviorImpl[Command, _] =>
+ Some(ds.asInstanceOf[DurableStateBehaviorImpl[Command, State]])
+
+ case deferred: DeferredBehavior[Command] =>
findDurableStateBehavior(deferred(context), context)
+ case _ => None
+ }
+
+ Behaviors.setup[Command] { context =>
+ findDurableStateBehavior(behavior, context).fold {
+ throw new AssertionError("Did not find the expected
DurableStateBehavior")
+ } { dsBehavior =>
+ val initialState = fromState.getOrElse(dsBehavior.emptyState)
+ new WrappedDurableStateBehavior(context, dsBehavior, initialState,
onPersist)
+ }
+ }
+ }
+
+ private class WrappedEventSourcedBehavior[Command, Event, State](
+ context: ActorContext[Command],
+ esBehavior: EventSourcedBehaviorImpl[Command, Event, State],
+ initialState: State,
+ initialSequenceNr: Long,
+ onEvent: (Event, Long, Set[String]) => Unit,
+ onSnapshot: (State, Long) => Unit)
+ extends AbstractBehavior[Command](context)
+ with WithSeqNrAccessible {
+ import pekko.persistence.typed.{ EventSourcedSignal, RecoveryCompleted,
SnapshotCompleted, SnapshotMetadata }
+ import pekko.persistence.typed.internal._
+
+ override def currentSequenceNumber: Long = sequenceNr
+
+ private def commandHandler = esBehavior.commandHandler
+ private def eventHandler = esBehavior.eventHandler
+ private def tagger = esBehavior.tagger
+ private def snapshotWhen = esBehavior.snapshotWhen
+ private def retention = esBehavior.retention
+ private def signalHandler = esBehavior.signalHandler
+
+ private var sequenceNr: Long = initialSequenceNr
+ private var state: State = initialState
+ private val stashedCommands = ListBuffer.empty[Command]
+
+ private def snapshotMetadata() =
+ SnapshotMetadata(esBehavior.persistenceId.toString, sequenceNr,
System.currentTimeMillis())
+ private def sendSignal(signal: EventSourcedSignal): Unit =
+ signalHandler.applyOrElse(state -> signal, doNothing)
+
+ sendSignal(RecoveryCompleted)
+
+ override def onMessage(cmd: Command): Behavior[Command] = {
+ var shouldSnapshot = false
+ var shouldUnstash = false
+ var shouldStop = false
+
+ def snapshotRequested(evt: Event): Boolean = {
+ val snapshotFromRetention = retention match {
+ case DisabledRetentionCriteria => false
+ case s: SnapshotCountRetentionCriteriaImpl =>
s.snapshotWhen(sequenceNr)
+ case unexpected => throw new
IllegalStateException(s"Unexpected retention criteria: $unexpected")
+ }
+
+ snapshotFromRetention || snapshotWhen(state, evt, sequenceNr)
+ }
+
+ @tailrec
+ def applyEffects(curEffect: EffectImpl[Event, State], sideEffects:
immutable.Seq[SideEffect[State]]): Unit =
+ curEffect match {
+ case CompositeEffect(eff: EffectImpl[Event, State], se) =>
+ applyEffects(eff, se ++ sideEffects)
+
+ case Persist(event) =>
+ sequenceNr += 1
+ state = eventHandler(state, event)
+ onEvent(event, sequenceNr, tagger(event))
+ shouldSnapshot = shouldSnapshot || snapshotRequested(event)
+ sideEffect(sideEffects)
+
+ case PersistAll(events) =>
+ val eventsWithSeqNrs =
+ events.map { event =>
+ sequenceNr += 1
+ state = eventHandler(state, event)
+ event -> sequenceNr
+ }
+
+ eventsWithSeqNrs.foreach {
+ case (event, seqNr) =>
+ // technically doesn't persist them atomically, but in tests
that shouldn't matter
+ onEvent(event, seqNr, tagger(event))
+ shouldSnapshot = shouldSnapshot || snapshotRequested(event)
+ }
+
+ sideEffect(sideEffects)
+
+ // From outside of the behavior, these are equivalent: no state
update
+ case _: PersistNothing.type | _: Unhandled.type =>
+ sideEffect(sideEffects)
+
+ case _: Stash.type =>
+ stashedCommands.append(cmd)
+ sideEffect(sideEffects)
+
+ case _ =>
+ context.log.error("Unexpected effect {}, stopping", curEffect)
+ Behaviors.stopped
+ }
+
+ def sideEffect(sideEffects: immutable.Seq[SideEffect[State]]): Unit =
+ sideEffects.iterator.foreach { effect =>
+ effect match {
+ case _: Stop.type => shouldStop = true
+ case _: UnstashAll.type => shouldUnstash = true
+ case cb: Callback[_] => cb.sideEffect(state)
+ }
+ }
+
+ applyEffects(commandHandler(state, cmd).asInstanceOf[EffectImpl[Event,
State]], Nil)
+
+ if (shouldSnapshot) {
+ onSnapshot(state, sequenceNr)
+ sendSignal(SnapshotCompleted(snapshotMetadata()))
+ }
+
+ if (shouldStop) Behaviors.stopped
+ else if (shouldUnstash && stashedCommands.nonEmpty) {
+ val numStashed = stashedCommands.length
+ val thisWrappedBehavior = this
+
+ Behaviors.setup { _ =>
+ Behaviors.withStash(numStashed) { stash =>
+ stashedCommands.foreach { sc =>
+ stash.stash(sc)
+ ()
+ }
+
+ stashedCommands.remove(0, numStashed)
+ stash.unstashAll(thisWrappedBehavior)
+ }
+ }
+ } else this
+ }
+ }
+
+ private class WrappedDurableStateBehavior[Command, State](
+ context: ActorContext[Command],
+ dsBehavior: DurableStateBehaviorImpl[Command, State],
+ initialState: State,
+ onPersist: (State, Long, String) => Unit)
+ extends AbstractBehavior[Command](context)
+ with WithRevisionAccessible {
+
+ import pekko.persistence.typed.state.{ DurableStateSignal,
RecoveryCompleted }
+ import pekko.persistence.typed.state.internal._
+
+ override def currentRevision: Long = sequenceNr
+
+ private def commandHandler = dsBehavior.commandHandler
+ private def signalHandler = dsBehavior.signalHandler
+ private val tag = dsBehavior.tag
+
+ private var sequenceNr: Long = 0
+ private var state: State = initialState
+ private val stashedCommands = ListBuffer.empty[Command]
+
+ private def sendSignal(signal: DurableStateSignal): Unit =
+ signalHandler.applyOrElse(state -> signal, doNothing)
+
+ sendSignal(RecoveryCompleted)
+
+ override def onMessage(cmd: Command): Behavior[Command] = {
+ var shouldUnstash = false
+ var shouldStop = false
+
+ def persistState(st: State): Unit = {
+ sequenceNr += 1
+ onPersist(st, sequenceNr, tag)
+ state = st
+ }
+
+ @tailrec
+ def applyEffects(curEffect: EffectImpl[State], sideEffects:
immutable.Seq[SideEffect[State]]): Unit =
+ curEffect match {
+ case CompositeEffect(eff: EffectImpl[_], se) =>
+ applyEffects(eff.asInstanceOf[EffectImpl[State]], se ++
sideEffects)
+
+ case Persist(st) =>
+ persistState(st)
+ sideEffect(sideEffects)
+
+ case _: PersistNothing.type | _: Unhandled.type =>
+ sideEffect(sideEffects)
+
+ case _: Stash.type =>
+ stashedCommands.append(cmd)
+ sideEffect(sideEffects)
+
+ case _ =>
+ context.log.error("Unexpected effect, stopping")
+ Behaviors.stopped
+ }
+
+ def sideEffect(sideEffects: immutable.Seq[SideEffect[State]]): Unit =
+ sideEffects.iterator.foreach { effect =>
+ effect match {
+ case _: Stop.type => shouldStop = true
+ case _: UnstashAll.type => shouldUnstash = true
+ case cb: Callback[_] => cb.sideEffect(state)
+ }
+ }
+
+ applyEffects(commandHandler(state, cmd).asInstanceOf[EffectImpl[State]],
Nil)
+
+ if (shouldStop) Behaviors.stopped
+ else if (shouldUnstash && stashedCommands.nonEmpty) {
+ val numStashed = stashedCommands.length
+ val thisWrappedBehavior = this
+
+ Behaviors.setup { _ =>
+ Behaviors.withStash(numStashed) { stash =>
+ stashedCommands.foreach { sc =>
+ stash.stash(sc)
+ () // explicit discard
+ }
+ stashedCommands.remove(0, numStashed)
+ stash.unstashAll(thisWrappedBehavior)
+ }
+ }
+ } else this
+ }
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] class PersistenceProbeImpl[T] {
+ type Element = (T, Long, Set[String])
+
+ val queue = new ConcurrentLinkedQueue[Element]()
+
+ def persist(elem: Element): Unit = { queue.offer(elem); () }
+
+ def rawExtract(): Element =
+ queue.poll() match {
+ case null => throw new AssertionError("No persistence effects in probe")
+ case elem => elem
+ }
+
+ def asScala: scaladsl.PersistenceProbe[T] =
+ new scaladsl.PersistenceProbe[T] {
+ import scaladsl.{ PersistenceEffect, PersistenceProbe }
+
+ def drain(): Seq[PersistenceEffect[T]] = {
+ @annotation.tailrec
+ def iter(acc: List[PersistenceEffect[T]]): List[PersistenceEffect[T]]
= {
+ val elem = queue.poll()
+ if (elem == null) acc else iter(persistenceEffect(elem) :: acc)
+ }
+
+ iter(Nil).reverse
+ }
+
+ def extract(): PersistenceEffect[T] = persistenceEffect(rawExtract())
+
+ def expectPersistedType[S <: T: ClassTag](): PersistenceEffect[S] =
+ rawExtract() match {
+ case (obj: S, sequenceNr, tags) => PersistenceEffect(obj,
sequenceNr, tags)
+ case (extracted, _, _) =>
+ throw new AssertionError(
+ s"Expected object of type
[${implicitly[ClassTag[S]].runtimeClass.getName}] to be persisted, " +
+ s"but actual was of type [${extracted.getClass.getName}]")
+ }
+
+ def hasEffects: Boolean = !queue.isEmpty
+
+ def expectPersisted(obj: T): PersistenceProbe[T] =
+ rawExtract() match {
+ case (persistedObj, _, _) if obj == persistedObj => this
+ case (persistedObj, _, _) =>
+ throw new AssertionError(s"Expected object [$obj] to be persisted,
but actual was [$persistedObj]")
+ }
+
+ def expectPersisted(obj: T, tag: String): PersistenceProbe[T] =
+ rawExtract() match {
+ case (persistedObj, _, tags) if (obj == persistedObj) && (tags(tag))
=> this
+
+ case (persistedObj, _, tags) if obj == persistedObj =>
+ throw new AssertionError(
+ s"Expected persistence with tag [$tag], but actual tags were
[${tags.mkString(",")}]")
+
+ case (persistedObj, _, tags) if tags(tag) =>
+ throw new AssertionError(s"Expected object [$obj] to be persisted,
but actual was [$persistedObj]")
+
+ case (persistedObj, _, tags) =>
+ throw new AssertionError(
+ s"Expected object [$obj] to be persisted with tag [$tag], " +
+ s"but actual object was [$persistedObj] with tags
[${tags.mkString(",")}]")
+ }
+
+ def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T] =
+ rawExtract() match {
+ case (persistedObj, _, persistedTags) if (obj == persistedObj) &&
(tags == persistedTags) => this
+ case (persistedObj, _, persistedTags) if obj == persistedObj
=>
+ val unexpected = persistedTags.diff(tags)
+ val notPersistedWith = tags.diff(persistedTags)
+
+ throw new AssertionError(
+ s"Expected persistence with [${tags.mkString(",")}], " +
+ s"but saw unexpected actual tags [${unexpected.mkString(",")}]
and " +
+ s"did not see actual tags [${notPersistedWith.mkString(",")}]")
+
+ case (persistedObj, _, persistedTags) if tags == persistedTags =>
+ throw new AssertionError(s"Expected object [$obj] to be persisted,
but actual was [$persistedObj}]")
+
+ case (persistedObj, _, persistedTags) =>
+ throw new AssertionError(
+ s"Expected object [$obj] to be persisted with tags
[${tags.mkString(",")}], " +
+ s"but actual object was [$persistedObj] with tags
[${persistedTags.mkString(",")}]")
+ }
+
+ private def persistenceEffect(elem: Element): PersistenceEffect[T] =
+ PersistenceEffect(elem._1, elem._2, elem._3)
+ }
+
+ def asJava: javadsl.PersistenceProbe[T] =
+ new javadsl.PersistenceProbe[T] {
+ import java.util.{ List => JList, Set => JSet }
+
+ import javadsl.{ PersistenceEffect, PersistenceProbe }
+
+ def drain(): JList[PersistenceEffect[T]] = {
+ @annotation.tailrec
+ def iter(acc: List[PersistenceEffect[T]]): List[PersistenceEffect[T]]
= {
+ val elem = queue.poll()
+ if (elem == null) acc else iter(persistenceEffect(elem) :: acc)
+ }
+ iter(Nil).reverse.asJava
+ }
+
+ def extract(): PersistenceEffect[T] = persistenceEffect(rawExtract())
+
+ def expectPersistedClass[S <: T](clazz: Class[S]): PersistenceEffect[S] =
+ rawExtract() match {
+ case (obj, sequenceNr, tags) if clazz.isInstance(obj) =>
+ PersistenceEffect(clazz.cast(obj), sequenceNr, tags.asJava)
+
+ case (extracted, _, _) =>
+ throw new AssertionError(
+ s"Expected object of type [${clazz.getName}] to be persisted, " +
+ s"but actual was of type [${extracted.getClass.getName}]")
+ }
+
+ def hasEffects: Boolean = !queue.isEmpty
+
+ def expectPersisted(obj: T): PersistenceProbe[T] =
+ rawExtract() match {
+ case (persistedObj, _, _) if obj == persistedObj => this
+ case (persistedObj, _, _) =>
+ throw new AssertionError(s"Expected object [$obj] to be persisted,
but actual was [$persistedObj]")
+ }
+
+ def expectPersisted(obj: T, tag: String): PersistenceProbe[T] =
+ rawExtract() match {
+ case (persistedObj, _, tags) if (obj == persistedObj) && (tags(tag))
=> this
+
+ case (persistedObj, _, tags) if obj == persistedObj =>
+ throw new AssertionError(
+ s"Expected persistence with tag [$tag], but actual tags were
[${tags.mkString(",")}]")
+
+ case (persistedObj, _, tags) if tags(tag) =>
+ throw new AssertionError(s"Expected object [$obj] to be persisted,
but actual was [$persistedObj]")
+
+ case (persistedObj, _, tags) =>
+ throw new AssertionError(
+ s"Expected object [$obj] to be persisted with tag [$tag], " +
+ s"but actual object was [$persistedObj] with tags
[${tags.mkString(",")}]")
+ }
+
+ def expectPersisted(obj: T, tags: JSet[String]): PersistenceProbe[T] = {
+ val sTags = tags.asScala
+
+ // Not sure if a Java Set after asScala-ing will compare equal to a
Scala Set...
+ def sameTags(persistedTags: Set[String]): Boolean =
+ sTags.forall(persistedTags) && persistedTags.forall(sTags)
+
+ rawExtract() match {
+ case (persistedObj, _, persistedTags) if (obj == persistedObj) &&
sameTags(persistedTags) => this
+
+ case (persistedObj, _, persistedTags) if obj == persistedObj =>
+ val unexpected = persistedTags.diff(sTags)
+ val notPersistedWith = sTags.diff(persistedTags)
+
+ throw new AssertionError(
+ s"Expected persistence with [${sTags.mkString(",")}], " +
+ s"but saw unexpected actual tags [${unexpected.mkString(",")}]
and " +
+ s"did not see actual tags [${notPersistedWith.mkString(",")}]")
+
+ case (persistedObj, _, persistedTags) if sameTags(persistedTags) =>
+ throw new AssertionError(s"Expected object [$obj] to be persisted,
but actual was [$persistedObj}]")
+
+ case (persistedObj, _, persistedTags) =>
+ throw new AssertionError(
+ s"Expected object [$obj] to be persisted with tags
[${sTags.mkString(",")}], " +
+ s"but actual object was [$persistedObj] with tags
[${persistedTags.mkString(",")}]")
+ }
+ }
+
+ private def persistenceEffect(element: Element): PersistenceEffect[T] =
+ PersistenceEffect(element._1, element._2, element._3.asJava)
+ }
+}
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
new file mode 100644
index 0000000000..71c78195fc
--- /dev/null
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.javadsl
+
+import java.util.{ List, Set }
+
+import scala.collection.immutable.{ Set => ScalaSet }
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.javadsl.BehaviorTestKit
+import pekko.actor.typed.Behavior
+import pekko.annotation.DoNotInherit
+import pekko.persistence.testkit.internal.{ PersistenceProbeImpl, Unpersistent
}
+
+/**
+ * Factory methods to create UnpersistentBehavior instances for testing.
+ *
+ * @since 1.3.0
+ */
+object UnpersistentBehavior {
+
+ /**
+ * Given an EventSourcedBehavior, produce a non-persistent Behavior which
synchronously publishes events and snapshots
+ * for inspection. State is updated as in the EventSourcedBehavior, and
side effects are performed synchronously. The
+ * resulting Behavior is, contingent on the command handling, event
handling, and side effects being compatible with the
+ * BehaviorTestKit, testable with the BehaviorTestKit.
+ *
+ * The returned Behavior does not intrinsically depend on configuration: it
therefore does not serialize and
+ * assumes an unbounded stash for commands.
+ *
+ * @param behavior a (possibly wrapped) EventSourcedBehavior to serve as
the basis for the unpersistent behavior
+ * @param initialState start the unpersistent behavior with this state; if
null, behavior's initialState will be used
+ * @param initialSequenceNr start the unpersistent behavior with this
sequence number; only applies if initialState is non-null
+ * @return an UnpersistentBehavior based on an EventSourcedBehavior
+ */
+ def fromEventSourced[Command, Event, State](
+ behavior: Behavior[Command],
+ initialState: State,
+ initialSequenceNr: Long): UnpersistentBehavior[Command, Event, State] = {
+ require(initialSequenceNr >= 0, "initialSequenceNr must be at least zero")
+
+ val initialStateAndSequenceNr = Option(initialState).map(_ ->
initialSequenceNr)
+ val eventProbe = new PersistenceProbeImpl[Event]
+ val snapshotProbe = new PersistenceProbeImpl[State]
+
+ val b =
+ Unpersistent.eventSourced(behavior, initialStateAndSequenceNr) {
+ (event: Event, sequenceNr: Long, tags: ScalaSet[String]) =>
+ eventProbe.persist((event, sequenceNr, tags))
+ } { (snapshot, sequenceNr) =>
+ snapshotProbe.persist((snapshot, sequenceNr, ScalaSet.empty))
+ }
+
+ new UnpersistentBehavior(b, eventProbe.asJava, snapshotProbe.asJava)
+ }
+
+ def fromEventSourced[Command, Event, State](
+ behavior: Behavior[Command]): UnpersistentBehavior[Command, Event,
State] =
+ fromEventSourced(behavior, null.asInstanceOf[State], 0)
+
+ def fromDurableState[Command, State](
+ behavior: Behavior[Command],
+ initialState: State): UnpersistentBehavior[Command, Void, State] = {
+ val probe = new PersistenceProbeImpl[State]
+ val b =
+ Unpersistent.durableState(behavior, Option(initialState)) { (state,
version, tag) =>
+ probe.persist((state, version, if (tag == "") ScalaSet.empty else
ScalaSet(tag)))
+ }
+
+ new UnpersistentBehavior(b, noEventProbe, probe.asJava)
+ }
+
+ def fromDurableState[Command, State](behavior: Behavior[Command]):
UnpersistentBehavior[Command, Void, State] =
+ fromDurableState(behavior, null.asInstanceOf[State])
+
+ private val noEventProbe: PersistenceProbe[Void] =
+ new PersistenceProbe[Void] {
+ def drain(): List[PersistenceEffect[Void]] =
+ // could return an empty list, but the intent is that any use of this
probe should fail the test
+ boom()
+
+ def extract(): PersistenceEffect[Void] = boom()
+ def expectPersistedClass[S <: Void](clazz: Class[S]):
PersistenceEffect[S] = boom()
+ def hasEffects: Boolean = boom()
+ def expectPersisted(obj: Void): PersistenceProbe[Void] = boom()
+ def expectPersisted(obj: Void, tag: String): PersistenceProbe[Void] =
boom()
+ def expectPersisted(obj: Void, tags: Set[String]):
PersistenceProbe[Void] = boom()
+
+ private def boom() = throw new AssertionError("No events were persisted")
+ }
+}
+
+final class UnpersistentBehavior[Command, Event, State] private (
+ behavior: Behavior[Command],
+ eventProbe: PersistenceProbe[Event],
+ stateProbe: PersistenceProbe[State]) {
+ def getBehavior(): Behavior[Command] = behavior
+ def getBehaviorTestKit(): BehaviorTestKit[Command] = btk
+
+ /** Note: durable state behaviors will not publish events to this probe */
+ def getEventProbe(): PersistenceProbe[Event] = eventProbe
+
+ def getStateProbe(): PersistenceProbe[State] = stateProbe
+ def getSnapshotProbe(): PersistenceProbe[State] = stateProbe
+
+ private lazy val btk = BehaviorTestKit.create(behavior)
+}
+
+final case class PersistenceEffect[T](persistedObject: T, sequenceNr: Long,
tags: Set[String])
+
+/**
+ * Not for user extension
+ */
+@DoNotInherit
+trait PersistenceProbe[T] {
+
+ /** Collect all persistence effects from the probe and empty the probe */
+ def drain(): List[PersistenceEffect[T]]
+
+ /** Get and remove the oldest persistence effect from the probe */
+ def extract(): PersistenceEffect[T]
+
+ /**
+ * Get and remove the oldest persistence effect from the probe, failing if
the
+ * persisted object is not of the requested type
+ */
+ def expectPersistedClass[S <: T](clazz: Class[S]): PersistenceEffect[S]
+
+ /** Are there any persistence effects */
+ def hasEffects: Boolean
+
+ /**
+ * Assert that the given object was persisted in the oldest persistence
effect and
+ * remove that persistence effect
+ */
+ def expectPersisted(obj: T): PersistenceProbe[T]
+
+ /**
+ * Assert that the given object was persisted with the given tag in the
oldest persistence
+ * effect and remove that persistence effect. If the persistence effect
has multiple tags,
+ * only one of them has to match in order for the assertion to succeed.
+ */
+ def expectPersisted(obj: T, tag: String): PersistenceProbe[T]
+
+ /**
+ * Assert that the given object was persisted with the given tag in the
oldest persistence
+ * effect and remove that persistence effect. If the persistence effect
has tags which are
+ * not given, the assertion fails.
+ */
+ def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T]
+}
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
new file mode 100644
index 0000000000..72405f0e27
--- /dev/null
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import scala.reflect.ClassTag
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.scaladsl.BehaviorTestKit
+import pekko.actor.typed.Behavior
+import pekko.annotation.DoNotInherit
+import pekko.persistence.testkit.internal.{ PersistenceProbeImpl, Unpersistent
}
+
+sealed trait UnpersistentBehavior[Command, State] {
+ val behavior: Behavior[Command]
+ lazy val behaviorTestKit = BehaviorTestKit(behavior)
+
+ def stateProbe: PersistenceProbe[State]
+}
+
+/**
+ * Factory methods to create UnpersistentBehavior instances for testing.
+ *
+ * @since 1.3.0
+ */
+object UnpersistentBehavior {
+
+ /**
+ * Given an EventSourcedBehavior, produce a non-persistent Behavior which
synchronously publishes events and snapshots
+ * for inspection. State is updated as in the EventSourcedBehavior, and
side effects are performed synchronously. The
+ * resulting Behavior is, contingent on the command handling, event
handling, and side effects being compatible with the
+ * BehaviorTestKit, testable with the BehaviorTestKit.
+ *
+ * The returned Behavior does not intrinsically depend on configuration: it
therefore does not serialize and assumes an
+ * unbounded stash for commands.
+ */
+ def fromEventSourced[Command, Event, State](
+ behavior: Behavior[Command],
+ initialStateAndSequenceNr: Option[(State, Long)] = None):
EventSourced[Command, Event, State] = {
+ val eventProbe = new PersistenceProbeImpl[Event]
+ val snapshotProbe = new PersistenceProbeImpl[State]
+ val resultingBehavior =
+ Unpersistent.eventSourced(behavior, initialStateAndSequenceNr) {
+ (event: Event, sequenceNr: Long, tags: Set[String]) =>
+ eventProbe.persist((event, sequenceNr, tags))
+ } { (snapshot, sequenceNr) =>
+ snapshotProbe.persist((snapshot, sequenceNr, Set.empty))
+ }
+
+ EventSourced(resultingBehavior, eventProbe.asScala, snapshotProbe.asScala)
+ }
+
+ def fromEventSourced[Command, Event, State](
+ behavior: Behavior[Command],
+ initialState: State): EventSourced[Command, Event, State] =
+ fromEventSourced(behavior, Some(initialState -> 0L))
+
+ def fromDurableState[Command, State](
+ behavior: Behavior[Command],
+ initialState: Option[State] = None): DurableState[Command, State] = {
+ val probe = new PersistenceProbeImpl[State]
+
+ val resultingBehavior =
+ Unpersistent.durableState(behavior, initialState) { (state, version,
tag) =>
+ probe.persist((state, version, if (tag.isEmpty) Set.empty else
Set(tag)))
+ }
+
+ DurableState(resultingBehavior, probe.asScala)
+ }
+
+ final case class EventSourced[Command, Event, State](
+ override val behavior: Behavior[Command],
+ val eventProbe: PersistenceProbe[Event],
+ override val stateProbe: PersistenceProbe[State])
+ extends UnpersistentBehavior[Command, State] {
+ def apply(f: (BehaviorTestKit[Command], PersistenceProbe[Event],
PersistenceProbe[State]) => Unit): Unit =
+ f(behaviorTestKit, eventProbe, stateProbe)
+
+ def snapshotProbe: PersistenceProbe[State] = stateProbe
+ }
+
+ final case class DurableState[Command, State](
+ override val behavior: Behavior[Command],
+ override val stateProbe: PersistenceProbe[State])
+ extends UnpersistentBehavior[Command, State] {
+ def apply(f: (BehaviorTestKit[Command], PersistenceProbe[State]) => Unit):
Unit =
+ f(behaviorTestKit, stateProbe)
+ }
+}
+
+final case class PersistenceEffect[T](persistedObject: T, sequenceNr: Long,
tags: Set[String])
+
+/**
+ * Not for user extension
+ */
+@DoNotInherit
+trait PersistenceProbe[T] {
+
+ /** Collect all persistence effects from the probe and empty the probe */
+ def drain(): Seq[PersistenceEffect[T]]
+
+ /** Get and remove the oldest persistence effect from the probe */
+ def extract(): PersistenceEffect[T]
+
+ /**
+ * Get and remove the oldest persistence effect from the probe, failing if
the
+ * persisted object is not of the requested type
+ */
+ def expectPersistedType[S <: T: ClassTag](): PersistenceEffect[S]
+
+ /** Are there any persistence effects? */
+ def hasEffects: Boolean
+
+ /**
+ * Assert that the given object was persisted in the oldest persistence
effect and
+ * remove that persistence effect
+ */
+ def expectPersisted(obj: T): PersistenceProbe[T]
+
+ /**
+ * Assert that the given object was persisted with the given tag in the
oldest
+ * persistence effect and remove that persistence effect. If the
persistence
+ * effect has multiple tags, only one of them has to match in order for the
+ * assertion to succeed.
+ */
+ def expectPersisted(obj: T, tag: String): PersistenceProbe[T]
+
+ /**
+ * Assert that the given object was persisted with the given tags in the
oldest
+ * persistence effect and remove that persistence effect. If the
persistence
+ * effect has tags which are not given, the assertion fails.
+ */
+ def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T]
+}
diff --git
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
new file mode 100644
index 0000000000..93aac5562b
--- /dev/null
+++
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.{ Behavior, RecipientRef }
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.state.RecoveryCompleted
+import pekko.persistence.typed.state.scaladsl._
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+object UnpersistentDurableStateSpec {
+ object BehaviorUnderTest {
+ sealed trait Command
+
+ case class Add(n: Int, replyTo: RecipientRef[Done]) extends Command
+ case class AddIfLessThan(toAdd: Int, ifLessThan: Int, replyTo:
RecipientRef[Boolean]) extends Command
+ case class AddWhenAtLeast(toAdd: Int, whenAtLeast: Int, replyTo:
RecipientRef[Done]) extends Command
+ case class NotifyIfAtLeast(n: Int, notifyTo: RecipientRef[Done], replyTo:
RecipientRef[Boolean]) extends Command
+ case class GetRevisionNumber(replyTo: RecipientRef[Long]) extends Command
+
+ case class State(count: Int, notifyAfter: Map[Int, RecipientRef[Done]],
nextNotifyAt: Int) {
+ def processAdd(n: Int): State = {
+ val nextCount = count + n
+
+ if (nextCount < nextNotifyAt) copy(count = nextCount)
+ else {
+ import scala.collection.mutable
+
+ val (nextNNA, nextNotifyAfter) = {
+ var lowestNotifyAt = Int.MaxValue
+ val inProgress = mutable.Map.empty[Int, RecipientRef[Done]]
+
+ notifyAfter.keysIterator.foreach { at =>
+ if (at > nextCount) {
+ lowestNotifyAt = lowestNotifyAt.min(at)
+ inProgress += (at -> notifyAfter(at))
+ }
+ }
+
+ lowestNotifyAt -> inProgress.toMap
+ }
+
+ copy(count = nextCount, notifyAfter = nextNotifyAfter, nextNotifyAt
= nextNNA)
+ }
+ }
+
+ def addObserver(at: Int, notifyTo: RecipientRef[Done]): State = {
+ val nextNNA = nextNotifyAt.min(at)
+ val nextNotifyAfter = notifyAfter.updated(at, notifyTo)
+
+ copy(notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA)
+ }
+ }
+
+ def apply(id: String, recoveryDone: RecipientRef[Done]): Behavior[Command]
=
+ Behaviors.setup { context =>
+ context.setLoggerName(s"entity-$id")
+
+ DurableStateBehavior[Command, State](
+ persistenceId = PersistenceId.ofUniqueId(id),
+ emptyState = State(0, Map.empty, Int.MaxValue),
+ commandHandler = applyCommand(_, _, context))
+ .receiveSignal {
+ case (state, RecoveryCompleted) =>
+ context.log.debug("Recovered state for id [{}] is [{}]", id,
state)
+ recoveryDone ! Done
+ }
+ .withTag("count")
+ }
+
+ private def applyCommand(state: State, cmd: Command, context:
ActorContext[Command]): Effect[State] = {
+ def persistAdd[Reply](n: Int, replyTo: RecipientRef[Reply], reply:
Reply): Effect[State] = {
+ val newState = state.processAdd(n)
+
+ Effect
+ .persist(newState)
+ .thenRun { nextState => // should be the same as newState, but...
+ state.notifyAfter.keysIterator
+ .filter { at =>
+ (at <= nextState.nextNotifyAt) &&
!nextState.notifyAfter.isDefinedAt(at)
+ }
+ .foreach { at =>
+ state.notifyAfter(at) ! Done
+ }
+
+ replyTo ! reply
+ }
+ .thenUnstashAll()
+ }
+
+ cmd match {
+ case Add(n, replyTo) => persistAdd(n, replyTo, Done)
+
+ case AddIfLessThan(toAdd, ifLessThan, replyTo) =>
+ if (state.count >= ifLessThan) {
+ context.log.info("Rejecting AddIfLessThan as count = {}",
state.count)
+ Effect.none[State].thenRun(_ => replyTo ! false)
+ } else persistAdd(toAdd, replyTo, true)
+
+ case AddWhenAtLeast(toAdd, whenAtLeast, replyTo) =>
+ if (state.count < whenAtLeast) Effect.stash()
+ else persistAdd(toAdd, replyTo, Done)
+
+ case NotifyIfAtLeast(n, notifyTo, replyTo) =>
+ if (state.count >= n) {
+ Effect.none[State].thenRun { _ =>
+ notifyTo ! Done
+ replyTo ! true
+ }
+ } else if (state.notifyAfter.isDefinedAt(n)) {
+ Effect.none[State].thenRun(_ => replyTo ! false)
+ } else {
+ Effect.persist(state.addObserver(n, notifyTo)).thenRun(_ =>
replyTo ! true)
+ }
+
+ case GetRevisionNumber(replyTo) =>
+ Effect.none[State].thenRun(_ => replyTo !
DurableStateBehavior.lastSequenceNumber(context))
+ }
+ }
+ }
+}
+
+class UnpersistentDurableStateSpec extends AnyWordSpec with Matchers {
+ import UnpersistentDurableStateSpec._
+
+ import pekko.actor.testkit.typed.scaladsl._
+
+ import org.slf4j.event.Level
+
+ "Unpersistent DurableStateBehavior" must {
+ "generate a fail-fast behavior from a non-DurableStateBehavior" in {
+ val notDurableState =
+ Behaviors.receive[Any] { (context, msg) =>
+ context.log.info("Got message {}", msg)
+ Behaviors.same
+ }
+
+ val unpersistent = UnpersistentBehavior.fromDurableState[Any,
Any](notDurableState)
+ an[AssertionError] shouldBe thrownBy { unpersistent.behaviorTestKit }
+ assert(!unpersistent.stateProbe.hasEffects, "should be no persistence
effects")
+ }
+
+ "generate a Behavior from a DurableStateBehavior and process
RecoveryCompleted" in {
+ import BehaviorUnderTest._
+
+ val recoveryDone = TestInbox[Done]()
+ val behavior = BehaviorUnderTest("test-1", recoveryDone.ref)
+
+ // accessor-style API
+ val unpersistent = UnpersistentBehavior.fromDurableState[Command,
State](behavior)
+ val probe = unpersistent.stateProbe
+ val testkit = unpersistent.behaviorTestKit
+
+ assert(!probe.hasEffects, "should not be persistence yet")
+ recoveryDone.expectMessage(Done)
+ val logs = testkit.logEntries()
+ logs.size shouldBe 1
+ logs.head.level shouldBe Level.DEBUG
+ logs.head.message shouldBe s"Recovered state for id [test-1] is
[${State(0, Map.empty, Int.MaxValue)}]"
+ }
+
+ "publish state changes in response to commands" in {
+ import BehaviorUnderTest._
+
+ val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+ val replyTo = TestInbox[Done]()
+
+ // and the more functional-style API
+ UnpersistentBehavior.fromDurableState[Command, State](behavior) {
(testkit, probe) =>
+ testkit.run(Add(1, replyTo.ref))
+ replyTo.expectMessage(Done)
+ probe.expectPersisted(State(1, Map.empty, Int.MaxValue), tag = "count")
+ assert(!testkit.hasEffects(), "should have no actor effects")
+ }
+ }
+
+ "allow a state to be injected" in {
+ import BehaviorUnderTest._
+
+ val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+ val notify3 = TestInbox[Done]()
+ val initialState = State(1, Map(3 -> notify3.ref), 3)
+
+ UnpersistentBehavior.fromDurableState[Command, State](behavior,
Some(initialState)) { (testkit, probe) =>
+ val logs = testkit.logEntries()
+
+ logs.size shouldBe 1
+ logs.head.level shouldBe Level.DEBUG
+ logs.head.message shouldBe s"Recovered state for id [test-1] is
[$initialState]"
+ assert(!probe.hasEffects, "should be no persistence effect")
+ assert(!notify3.hasMessages, "no messages should be sent to notify3")
+
+ val replyTo = TestInbox[Done]()
+ testkit.run(AddWhenAtLeast(2, 2, replyTo.ref))
+ assert(!replyTo.hasMessages, "no messages should be sent now")
+ assert(!notify3.hasMessages, "no messages should be sent to notify3")
+ assert(!probe.hasEffects, "should be no persistence effect")
+ assert(!testkit.hasEffects(), "should be no testkit effects")
+
+ testkit.run(Add(3, TestInbox[Done]().ref))
+ replyTo.expectMessage(Done)
+ notify3.expectMessage(Done)
+ assert(!testkit.hasEffects(), "should be no testkit effects")
+ probe.drain() should contain theSameElementsInOrderAs Seq(
+ PersistenceEffect(State(4, Map.empty, Int.MaxValue), 1,
Set("count")),
+ PersistenceEffect(State(6, Map.empty, Int.MaxValue), 2,
Set("count")))
+ }
+ }
+
+ "stash and unstash properly" in {
+ import BehaviorUnderTest._
+
+ val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+ val replyTo1 = TestInbox[Done]()
+ val add = Add(1, TestInbox[Done]().ref)
+
+ UnpersistentBehavior.fromDurableState[Command, State](behavior) {
(testkit, probe) =>
+ // stashes
+ testkit.run(AddWhenAtLeast(1, 1, replyTo1.ref))
+ assert(!probe.hasEffects, "should be no persistence effect")
+ assert(!replyTo1.hasMessages, "count is not yet 1")
+
+ // unstashes
+ testkit.run(add)
+ replyTo1.expectMessage(Done)
+ probe.drain() shouldNot be(empty)
+
+ // unstash but nothing in the stash
+ testkit.run(add)
+ assert(!replyTo1.hasMessages, "should not send again")
+ probe.drain() shouldNot be(empty)
+ }
+ }
+
+ "retrieve revision number" in {
+ import BehaviorUnderTest._
+
+ val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+
+ val replyTo = TestInbox[Long]()
+ UnpersistentBehavior.fromDurableState[Command, State](behavior) {
(testkit, probe) =>
+ testkit.run(GetRevisionNumber(replyTo.ref))
+ (the[AssertionError] thrownBy (probe.extract())).getMessage shouldBe
"No persistence effects in probe"
+ replyTo.expectMessage(0)
+ }
+ }
+ }
+}
diff --git
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
new file mode 100644
index 0000000000..192d9a1da0
--- /dev/null
+++
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.{ Behavior, RecipientRef }
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import pekko.persistence.typed.{ PersistenceId, RecoveryCompleted }
+import pekko.persistence.typed.scaladsl._
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+object UnpersistentEventSourcedSpec {
+ object BehaviorUnderTest {
+ sealed trait Command
+
+ case class PersistDomainEvent(replyTo: RecipientRef[Done]) extends Command
+ case class PersistDomainEventIfBefore(count: Int, replyTo:
RecipientRef[Boolean]) extends Command
+ case class PersistDomainEventAfter(count: Int, replyTo:
RecipientRef[Done]) extends Command
+ case class NotifyAfter(count: Int, notifyTo: RecipientRef[Done], replyTo:
RecipientRef[Boolean]) extends Command
+ case class GetSequenceNumber(replyTo: RecipientRef[Long]) extends Command
+ case object SnapshotNow extends Command
+
+ sealed trait Event
+ case object DomainEvent extends Event
+ case object SnapshotMade extends Event
+ case class ObserverAdded(count: Int, notifyTo: RecipientRef[Done]) extends
Event
+
+ case class State(domainEvtCount: Int, notifyAfter: Map[Int,
RecipientRef[Done]], nextNotifyAt: Int)
+
+ val initialState = State(0, Map.empty, Int.MaxValue)
+
+ def apply(id: String, recoveryDone: RecipientRef[Done]): Behavior[Command]
=
+ Behaviors.setup { context =>
+ context.setLoggerName(s"entity-$id")
+
+ EventSourcedBehavior[Command, Event, State](
+ persistenceId = PersistenceId.ofUniqueId(id),
+ emptyState = initialState,
+ commandHandler = applyCommand(_, _, context),
+ eventHandler = applyEvent(_, _))
+ .receiveSignal {
+ case (state, RecoveryCompleted) =>
+ context.log.debug("Recovered state for id [{}] is [{}]", id,
state)
+ recoveryDone ! Done
+ }
+ .snapshotWhen {
+ case (_, SnapshotMade, _) => true
+ case _ => false
+ }
+ .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3,
keepNSnapshots = 2))
+ .withTagger {
+ case DomainEvent => Set("domain")
+ case _ => Set.empty
+ }
+ }
+
+ private def applyCommand(state: State, cmd: Command, context:
ActorContext[Command]): Effect[Event, State] = {
+ def persistDomainEvent[Reply](replyTo: RecipientRef[Reply], reply:
Reply): Effect[Event, State] =
+ Effect
+ .persist[Event, State](DomainEvent)
+ .thenRun { newState =>
+ state.notifyAfter.keysIterator
+ .filter { at =>
+ (at <= newState.nextNotifyAt) &&
!newState.notifyAfter.isDefinedAt(at)
+ }
+ .foreach { at =>
+ state.notifyAfter(at) ! Done
+ }
+
+ replyTo ! reply
+ }
+ .thenUnstashAll()
+
+ cmd match {
+ case PersistDomainEvent(replyTo) => persistDomainEvent(replyTo, Done)
+
+ case PersistDomainEventIfBefore(count, replyTo) =>
+ if (state.domainEvtCount >= count) {
+ context.log.info("Rejecting PersistDomainEventIfBefore as
domainEvtCount = {}", state.domainEvtCount)
+ Effect.none.thenRun(_ => replyTo ! false)
+ } else persistDomainEvent(replyTo, true)
+
+ case PersistDomainEventAfter(count, replyTo) =>
+ if (state.domainEvtCount < count) Effect.stash()
+ else persistDomainEvent(replyTo, Done)
+
+ case NotifyAfter(count, notifyTo, replyTo) =>
+ if (state.domainEvtCount >= count) {
+ Effect.none.thenRun { _ =>
+ notifyTo ! Done
+ replyTo ! true
+ }
+ } else if (state.notifyAfter.isDefinedAt(count)) {
+ Effect.none.thenRun(_ => replyTo ! false)
+ } else {
+ Effect.persist(ObserverAdded(count, notifyTo)).thenRun(_ =>
replyTo ! true)
+ }
+
+ case SnapshotNow => Effect.persist(SnapshotMade)
+
+ case GetSequenceNumber(replyTo) =>
+ Effect.none.thenRun(_ => replyTo !
EventSourcedBehavior.lastSequenceNumber(context))
+ }
+ }
+
+ private[scaladsl] def applyEvent(state: State, evt: Event): State =
+ evt match {
+ case DomainEvent =>
+ val nextDomainEvtCount = state.domainEvtCount + 1
+
+ if (nextDomainEvtCount < state.nextNotifyAt)
state.copy(domainEvtCount = nextDomainEvtCount)
+ else {
+ import scala.collection.mutable
+
+ val (nextNNA, nextNotifyAfter) = {
+ var lowestNotifyAt = Int.MaxValue
+ val inProgress = mutable.Map.empty[Int, RecipientRef[Done]]
+
+ state.notifyAfter.keysIterator.foreach { at =>
+ if (at > nextDomainEvtCount) {
+ lowestNotifyAt = lowestNotifyAt.min(at)
+ inProgress += (at -> state.notifyAfter(at))
+ }
+ // else ()
+ }
+
+ lowestNotifyAt -> inProgress.toMap
+ }
+
+ state.copy(domainEvtCount = nextDomainEvtCount, notifyAfter =
nextNotifyAfter, nextNotifyAt = nextNNA)
+ }
+
+ case SnapshotMade => state
+ case ObserverAdded(count, notifyTo) =>
+ val nextNNA = state.nextNotifyAt.min(count)
+ val nextNotifyAfter = state.notifyAfter.updated(count, notifyTo)
+
+ state.copy(notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA)
+ }
+ }
+}
+
+class UnpersistentEventSourcedSpec extends AnyWordSpec with Matchers {
+ import UnpersistentEventSourcedSpec._
+
+ import pekko.actor.testkit.typed.scaladsl._
+
+ import org.slf4j.event.Level
+
+ "Unpersistent EventSourcedBehavior" must {
+ "generate a failing behavior from a non-EventSourcedBehavior" in {
+ val notEventSourced =
+ Behaviors.receive[Any] { (context, msg) =>
+ context.log.info("Got message {}", msg)
+ Behaviors.same
+ }
+
+ val unpersistent = UnpersistentBehavior.fromEventSourced[Any, Any,
Any](notEventSourced)
+ an[AssertionError] shouldBe thrownBy { unpersistent.behaviorTestKit }
+ an[AssertionError] shouldBe thrownBy { unpersistent.eventProbe.extract()
}
+ an[AssertionError] shouldBe thrownBy {
unpersistent.snapshotProbe.extract() }
+ }
+
+ "generate a Behavior from an EventSourcedBehavior and process
RecoveryCompleted" in {
+ import BehaviorUnderTest._
+
+ val recoveryDone = TestInbox[Done]()
+ val behavior = BehaviorUnderTest("test-1", recoveryDone.ref)
+
+ // accessor API
+ val unpersistent = UnpersistentBehavior.fromEventSourced[Command, Event,
State](behavior)
+ val testkit = unpersistent.behaviorTestKit
+ val eventProbe = unpersistent.eventProbe
+ val snapshotProbe = unpersistent.snapshotProbe
+
+ assert(!eventProbe.hasEffects, "should not be events")
+ assert(!snapshotProbe.hasEffects, "should not be snapshots")
+ recoveryDone.expectMessage(Done)
+ val logs = testkit.logEntries()
+ logs.size shouldBe 1
+ logs.head.level shouldBe Level.DEBUG
+ logs.head.message shouldBe s"Recovered state for id [test-1] is
[${State(0, Map.empty, Int.MaxValue)}]"
+ }
+
+ "publish events and evolve observed state in response to commands" in {
+ import BehaviorUnderTest._
+
+ val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+ val replyTo = TestInbox[Done]()
+
+ // resource-style API
+ UnpersistentBehavior.fromEventSourced[Command, Event, State](behavior) {
(testkit, eventProbe, snapshotProbe) =>
+ testkit.clearLog()
+
+ testkit.run(PersistDomainEvent(replyTo.ref))
+ replyTo.expectMessage(Done)
+ eventProbe.expectPersisted(DomainEvent, Set("domain"))
+ snapshotProbe.drain() shouldBe empty
+ assert(!testkit.hasEffects(), "should have no effects")
+ testkit.clearLog()
+
+ testkit.run(SnapshotNow)
+ assert(!replyTo.hasMessages, "should not be a reply")
+
+ val PersistenceEffect(_, seqNr, tags) =
eventProbe.expectPersistedType[SnapshotMade.type]()
+ seqNr shouldBe 2
+ tags shouldBe empty
+
+ snapshotProbe.expectPersisted(State(1, Map.empty, Int.MaxValue))
+ }
+ }
+
+ "not publish events if the event handler fails" in {
+ val behavior =
+ EventSourcedBehavior[Int, Int, Unit](
+ persistenceId = PersistenceId.ofUniqueId("degenerator"),
+ emptyState = (),
+ commandHandler = { (_, cmd: Int) =>
+ cmd match {
+ case nothing if nothing < 1 => Effect.none
+ case 1 => Effect.persist(1)
+ case n => Effect.persist(Range(n, 0, -1))
// count down to 1
+ }
+ },
+ eventHandler = { (_, evt) =>
+ if (evt == 1) throw new RuntimeException("Kaboom!")
+ })
+
+ UnpersistentBehavior.fromEventSourced[Int, Int, Unit](behavior) {
(testkit, eventProbe, _) =>
+ val oneException = the[RuntimeException] thrownBy testkit.run(1)
+ oneException.getMessage shouldBe "Kaboom!"
+ eventProbe.hasEffects shouldBe false
+
+ val twoException = the[RuntimeException] thrownBy testkit.run(2)
+ twoException.getMessage shouldBe "Kaboom!"
+ eventProbe.hasEffects shouldBe false
+ }
+ }
+
+ "allow a state and starting offset to be injected" in {
+ import BehaviorUnderTest._
+
+ val recoveryDone = TestInbox[Done]()
+ val behavior = BehaviorUnderTest("test-1", recoveryDone.ref)
+
+ val notify3 = TestInbox[Done]()
+ val initialState =
+ Seq(ObserverAdded(3, notify3.ref), SnapshotMade, DomainEvent,
DomainEvent)
+ .foldLeft(State(0, Map.empty, Int.MaxValue))(applyEvent _)
+
+ UnpersistentBehavior.fromEventSourced[Command, Event, State](behavior,
Some(initialState -> 41L)) {
+ (testkit, eventProbe, snapshotProbe) =>
+ recoveryDone.expectMessage(Done)
+ val logs = testkit.logEntries()
+ logs.size shouldBe 1
+ logs.head.level shouldBe Level.DEBUG
+ logs.head.message shouldBe s"Recovered state for id [test-1] is
[$initialState]"
+ eventProbe.drain() shouldBe empty
+ snapshotProbe.drain() shouldBe empty
+ assert(!notify3.hasMessages, "no messages should be sent to notify3")
+
+ val replyTo = TestInbox[Done]()
+ testkit.run(PersistDomainEventAfter(2, replyTo.ref))
+ assert(!testkit.hasEffects(), "should be no testkit effects")
+ eventProbe.extract() shouldBe PersistenceEffect(DomainEvent, 42,
Set("domain"))
+ snapshotProbe.expectPersisted(State(3, Map.empty, Int.MaxValue))
+
+ notify3.expectMessage(Done)
+ replyTo.expectMessage(Done)
+ }
+ }
+
+ "stash and unstash properly" in {
+ import BehaviorUnderTest._
+
+ val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+
+ UnpersistentBehavior.fromEventSourced[Command, Event, State](behavior,
None) {
+ (testkit, eventProbe, snapshotProbe) =>
+ val replyTo1 = TestInbox[Done]()
+ val pde = PersistDomainEvent(TestInbox[Done]().ref)
+
+ // stashes
+ testkit.run(PersistDomainEventAfter(1, replyTo1.ref))
+ eventProbe.drain() shouldBe empty
+ snapshotProbe.drain() shouldBe empty
+ assert(!replyTo1.hasMessages, "have not persisted first domain
event")
+
+ // unstashes
+ testkit.run(pde)
+ replyTo1.expectMessage(Done)
+
+ // unstash, but nothing in the stash
+ testkit.run(pde)
+ assert(!replyTo1.hasMessages, "should not send again")
+ }
+ }
+
+ "retrieve sequence number properly" in {
+ import BehaviorUnderTest._
+
+ val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+
+ val randomStartingOffset =
+ scala.util.Random.nextLong() match {
+ case Long.MinValue => Long.MaxValue
+ case x if x < 0 => -x
+ case x => x
+ }
+
+ UnpersistentBehavior.fromEventSourced[Command, Event, State](behavior,
+ Some(initialState -> randomStartingOffset)) {
+ (testkit, eventProbe, snapshotProbe) =>
+ val replyTo = TestInbox[Long]()
+
+ testkit.run(GetSequenceNumber(replyTo.ref))
+ eventProbe.drain() shouldBe empty
+ snapshotProbe.drain() shouldBe empty
+ replyTo.expectMessage(randomStartingOffset)
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]