This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.3.x-unpersistence
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 554a13c831998f217ca159805f2ae97286d9ed41
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Nov 9 19:48:33 2025 +0800

    feat: Add unpersistent versions of persistent behaviors (#2456)
    
    (cherry picked from commit 523c05f7f6b9ab7cc8b4e91e80b877055221e405)
---
 .../typed/AccountExampleUnpersistentDocTest.java   | 161 +++++++
 .../typed/AccountExampleUnpersistentDocSpec.scala  |  97 ++++
 docs/src/main/paradox/typed/persistence-testing.md |  44 +-
 docs/src/main/paradox/typed/testing-sync.md        |   2 +-
 .../testkit/internal/Unpersistent.scala            | 488 +++++++++++++++++++++
 .../testkit/javadsl/UnpersistentBehavior.scala     | 162 +++++++
 .../testkit/scaladsl/UnpersistentBehavior.scala    | 144 ++++++
 .../scaladsl/UnpersistentDurableStateSpec.scala    | 263 +++++++++++
 .../scaladsl/UnpersistentEventSourcedSpec.scala    | 336 ++++++++++++++
 9 files changed, 1694 insertions(+), 3 deletions(-)

diff --git 
a/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
 
b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
new file mode 100644
index 0000000000..759408f65e
--- /dev/null
+++ 
b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.org.apache.pekko.cluster.sharding.typed;
+
+import org.apache.pekko.Done;
+import org.scalatestplus.junit.JUnitSuite;
+
+import static 
jdocs.org.apache.pekko.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
+import static org.junit.Assert.*;
+
+// #test
+import java.math.BigDecimal;
+
+import org.apache.pekko.actor.testkit.typed.javadsl.BehaviorTestKit;
+import org.apache.pekko.actor.testkit.typed.javadsl.ReplyInbox;
+import org.apache.pekko.actor.testkit.typed.javadsl.StatusReplyInbox;
+import org.apache.pekko.persistence.testkit.javadsl.UnpersistentBehavior;
+import org.apache.pekko.persistence.testkit.javadsl.PersistenceEffect;
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.junit.Test;
+
+public class AccountExampleUnpersistentDocTest
+    // #test
+    extends JUnitSuite
+// #test
+{
+    @Test
+    public void createWithEmptyBalance() {
+        UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+            unpersistent = emptyAccount();
+
+        BehaviorTestKit<AccountEntity.Command> testkit = 
unpersistent.getBehaviorTestKit();
+
+        StatusReplyInbox<Done> ackInbox = 
testkit.runAskWithStatus(AccountEntity.CreateAccount::new);
+
+        ackInbox.expectValue(Done.getInstance());
+        
unpersistent.getEventProbe().expectPersisted(AccountEntity.AccountCreated.INSTANCE);
+
+        // internal state is only exposed by the behavior via responses to 
messages or if it happens
+        //  to snapshot.  This particular behavior never snapshots, so we 
query within the actor's
+        //  protocol
+        assertFalse(unpersistent.getSnapshotProbe().hasEffects());
+
+        ReplyInbox<AccountEntity.CurrentBalance> currentBalanceInbox =
+            testkit.runAsk(AccountEntity.GetBalance::new);
+
+        assertEquals(BigDecimal.ZERO, 
currentBalanceInbox.receiveReply().balance);
+    }
+
+    @Test
+    public void handleDepositAndWithdraw() {
+        UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+            unpersistent = openedAccount();
+
+        BehaviorTestKit<AccountEntity.Command> testkit = 
unpersistent.getBehaviorTestKit();
+        BigDecimal currentBalance;
+
+        testkit
+            .runAskWithStatus(
+                Done.class, replyTo -> new 
AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo))
+            .expectValue(Done.getInstance());
+
+        assertEquals(
+            BigDecimal.valueOf(100),
+            unpersistent
+                .getEventProbe()
+                .expectPersistedClass(AccountEntity.Deposited.class)
+                .persistedObject()
+                .amount);
+
+        currentBalance =
+            testkit
+                .runAsk(AccountEntity.CurrentBalance.class, 
AccountEntity.GetBalance::new)
+                .receiveReply()
+                .balance;
+
+        assertEquals(BigDecimal.valueOf(100), currentBalance);
+
+        testkit
+            .runAskWithStatus(
+                Done.class, replyTo -> new 
AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo))
+            .expectValue(Done.getInstance());
+
+        // can save the persistence effect for in-depth inspection
+        PersistenceEffect<AccountEntity.Withdrawn> withdrawEffect =
+            
unpersistent.getEventProbe().expectPersistedClass(AccountEntity.Withdrawn.class);
+        assertEquals(BigDecimal.valueOf(10), 
withdrawEffect.persistedObject().amount);
+        assertEquals(3L, withdrawEffect.sequenceNr());
+        assertTrue(withdrawEffect.tags().isEmpty());
+
+        currentBalance =
+            testkit
+                .runAsk(AccountEntity.CurrentBalance.class, 
AccountEntity.GetBalance::new)
+                .receiveReply()
+                .balance;
+
+        assertEquals(BigDecimal.valueOf(90), currentBalance);
+    }
+
+    @Test
+    public void rejectWithdrawOverdraft() {
+        UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+            unpersistent = accountWithBalance(BigDecimal.valueOf(100));
+
+        BehaviorTestKit<AccountEntity.Command> testkit = 
unpersistent.getBehaviorTestKit();
+
+        testkit
+            .runAskWithStatus(
+                Done.class, replyTo -> new 
AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo))
+            .expectErrorMessage("not enough funds to withdraw 110");
+
+        assertFalse(unpersistent.getEventProbe().hasEffects());
+    }
+
+    // #test
+    private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+    emptyAccount() {
+        return
+            // #unpersistent-behavior
+            UnpersistentBehavior.fromEventSourced(
+                AccountEntity.create("1", PersistenceId.of("Account", "1")),
+                null, // use the initial state
+                0 // initial sequence number
+            );
+        // #unpersistent-behavior
+    }
+
+    private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+    openedAccount() {
+        return
+            // #unpersistent-behavior-provided-state
+            UnpersistentBehavior.fromEventSourced(
+                AccountEntity.create("1", PersistenceId.of("Account", "1")),
+                new AccountEntity.EmptyAccount()
+                    .openedAccount(), // duplicate the event handler for 
AccountCreated on an EmptyAccount
+                1 // assume that CreateAccount was the first command
+            );
+        // #unpersistent-behavior-provided-state
+    }
+
+    private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+    accountWithBalance(BigDecimal balance) {
+        return UnpersistentBehavior.fromEventSourced(
+            AccountEntity.create("1", PersistenceId.of("Account", "1")),
+            new AccountEntity.OpenedAccount(balance),
+            2);
+    }
+    // #test
+}
+// #test
diff --git 
a/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
 
b/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
new file mode 100644
index 0000000000..865e936a50
--- /dev/null
+++ 
b/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.org.apache.pekko.cluster.sharding.typed
+
+import org.apache.pekko
+import pekko.Done
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+// #test
+import pekko.persistence.testkit.scaladsl.UnpersistentBehavior
+import pekko.persistence.typed.PersistenceId
+
+class AccountExampleUnpersistentDocSpec
+    extends AnyWordSpecLike
+    // #test
+    with Matchers
+    // #test
+    {
+  // #test
+  import AccountExampleWithEventHandlersInState.AccountEntity
+  // #test
+  "Account" must {
+    "be created with zero balance" in {
+      onAnEmptyAccount { (testkit, eventProbe, snapshotProbe) =>
+        
testkit.runAskWithStatus[Done](AccountEntity.CreateAccount(_)).expectDone()
+
+        eventProbe.expectPersisted(AccountEntity.AccountCreated)
+
+        // internal state is only exposed by the behavior via responses to 
messages or if it happens
+        //  to snapshot.  This particular behavior never snapshots, so we 
query within the actor's
+        //  protocol
+        snapshotProbe.hasEffects shouldBe false
+
+        
testkit.runAsk[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)).receiveReply().balance
 shouldBe 0
+      }
+    }
+
+    "handle Deposit and Withdraw" in {
+      onAnOpenedAccount { (testkit, eventProbe, _) =>
+        testkit.runAskWithStatus[Done](AccountEntity.Deposit(100, 
_)).expectDone()
+
+        eventProbe.expectPersisted(AccountEntity.Deposited(100))
+
+        testkit.runAskWithStatus[Done](AccountEntity.Withdraw(10, 
_)).expectDone()
+
+        eventProbe.expectPersisted(AccountEntity.Withdrawn(10))
+
+        
testkit.runAsk[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)).receiveReply().balance
 shouldBe 90
+      }
+    }
+
+    "reject Withdraw overdraft" in {
+      onAnAccountWithBalance(100) { (testkit, eventProbe, _) =>
+        testkit.runAskWithStatus(AccountEntity.Withdraw(110, 
_)).receiveStatusReply().isError shouldBe true
+
+        eventProbe.hasEffects shouldBe false
+      }
+    }
+  }
+  // #test
+
+  // #unpersistent-behavior
+  private def onAnEmptyAccount
+      : UnpersistentBehavior.EventSourced[AccountEntity.Command, 
AccountEntity.Event, AccountEntity.Account] =
+    UnpersistentBehavior.fromEventSourced(AccountEntity("1", 
PersistenceId("Account", "1")))
+  // #unpersistent-behavior
+
+  // #unpersistent-behavior-provided-state
+  private def onAnOpenedAccount
+      : UnpersistentBehavior.EventSourced[AccountEntity.Command, 
AccountEntity.Event, AccountEntity.Account] =
+    UnpersistentBehavior.fromEventSourced(
+      AccountEntity("1", PersistenceId("Account", "1")),
+      Some(
+        AccountEntity.EmptyAccount.applyEvent(AccountEntity.AccountCreated) -> 
// reuse the event handler
+        1L // assume that CreateAccount was the first command
+      ))
+  // #unpersistent-behavior-provided-state
+
+  private def onAnAccountWithBalance(balance: BigDecimal) =
+    UnpersistentBehavior.fromEventSourced(
+      AccountEntity("1", PersistenceId("Account", "1")),
+      Some(AccountEntity.OpenedAccount(balance) -> 2L))
+  // #test
+}
+// #test
diff --git a/docs/src/main/paradox/typed/persistence-testing.md 
b/docs/src/main/paradox/typed/persistence-testing.md
index 46d828c153..12eea0336f 100644
--- a/docs/src/main/paradox/typed/persistence-testing.md
+++ b/docs/src/main/paradox/typed/persistence-testing.md
@@ -19,9 +19,49 @@ To use Pekko Persistence TestKit, add the module to your 
project:
 
 @@project-info{ projectId="persistence-testkit" }
 
-## Unit testing
+## Unit testing with the BehaviorTestKit
 
-**Note!** The `EventSourcedBehaviorTestKit` is a new feature, api may have 
changes breaking source compatibility in future versions.
+**Note!** The `UnpersistentBehavior` is a new feature: the API may have 
changes breaking source compatibility in future versions.
+
+Unit testing of `EventSourcedBehavior` can be performed by converting it into 
an @apidoc[UnpersistentBehavior]. Instead of
+persisting events and snapshots, the `UnpersistentBehavior` exposes 
@apidoc[PersistenceProbe]s for events and snapshots which
+can be asserted on.
+
+Scala
+: @@snip 
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
 { #unpersistent-behavior }
+
+Java
+: @@snip 
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
 { #unpersistent-behavior }
+
+The `UnpersistentBehavior` can be initialized with arbitrary states:
+
+Scala
+: @@snip 
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
 { #unpersistent-behavior-provided-state }
+
+Java
+: @@snip 
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
 { #unpersistent-behavior-provided-state }
+
+The `UnpersistentBehavior` is especially well-suited to the synchronous 
@ref:[`BehaviorTestKit`](testing-sync.md#synchronous-behavior-testing):
+the `UnpersistentBehavior` can directly construct a `BehaviorTestKit` wrapping 
the behavior.  When commands are run by `BehaviorTestKit`,
+they are processed in the calling thread (viz. the test suite), so when the 
run returns, the suite can be sure that the message has been
+fully processed.  The internal state of the `EventSourcedBehavior` is not 
exposed to the suite except to the extent that it affects how
+the behavior responds to commands or the events it persists (in addition, any 
snapshots made by the behavior are available through a
+`PersistenceProbe`).
+
+A full test for the `AccountEntity`, which is shown in the @ref:[Persistence 
Style Guide](persistence-style.md) might look like:
+
+Scala
+: @@snip 
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
 { #test }
+
+Java
+: @@snip 
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
 { #test }
+
+`UnpersistentBehavior` does not require any configuration.  It therefore does 
not verify the serialization of commands, events, or state.
+If using this style, it is advised to independently test serialization for 
those classes.
+
+## Unit testing with the the ActorTestKit and EventSourcedBehaviorTestKit
+
+**Note!** The `EventSourcedBehaviorTestKit` is a new feature: the API may have 
changes breaking source compatibility in future versions.
 
 Unit testing of `EventSourcedBehavior` can be done with the 
@apidoc[EventSourcedBehaviorTestKit]. It supports running
 one command at a time and you can assert that the synchronously returned 
result is as expected. The result contains the
diff --git a/docs/src/main/paradox/typed/testing-sync.md 
b/docs/src/main/paradox/typed/testing-sync.md
index 3e9ab222f7..a96828e66c 100644
--- a/docs/src/main/paradox/typed/testing-sync.md
+++ b/docs/src/main/paradox/typed/testing-sync.md
@@ -12,7 +12,7 @@ limitations:
 * Spawning of @scala[`Future`]@java[`CompletionStage`] or other asynchronous 
task and you rely on a callback to
   complete before observing the effect you want to test.
 * Usage of scheduler is not supported.
-* `EventSourcedBehavior` can't be tested.
+* `EventSourcedBehavior` can't be fully tested, but it is possible to 
@ref:[test the core 
functionality](persistence-testing.md#unit-testing-with-the-behaviortestkit)
 * Interactions with other actors must be stubbed.
 * Blackbox testing style.
 * Supervision is not supported.
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
new file mode 100644
index 0000000000..adecc6bcdc
--- /dev/null
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.internal
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.annotation.tailrec
+import scala.collection.immutable
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.apache.pekko
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.internal.BehaviorImpl.DeferredBehavior
+import pekko.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
+import pekko.annotation.InternalApi
+import pekko.persistence.testkit.{ javadsl, scaladsl }
+import pekko.persistence.typed.internal.EventSourcedBehaviorImpl
+import pekko.persistence.typed.internal.Running.WithSeqNrAccessible
+import pekko.persistence.typed.state.internal.DurableStateBehaviorImpl
+import pekko.persistence.typed.state.internal.Running.WithRevisionAccessible
+import pekko.util.ConstantFun.{ scalaAnyToUnit => doNothing }
+import pekko.util.ccompat.JavaConverters._
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] object Unpersistent {
+
+  def eventSourced[Command, Event, State](behavior: Behavior[Command], 
fromStateAndSequenceNr: Option[(State, Long)])(
+      onEvent: (Event, Long, Set[String]) => Unit)(onSnapshot: (State, Long) 
=> Unit): Behavior[Command] = {
+    @tailrec
+    def findEventSourcedBehavior(
+        b: Behavior[Command],
+        context: ActorContext[Command]): 
Option[EventSourcedBehaviorImpl[Command, Event, State]] = {
+      b match {
+        case es: EventSourcedBehaviorImpl[Command, _, _] =>
+          Some(es.asInstanceOf[EventSourcedBehaviorImpl[Command, Event, 
State]])
+
+        case deferred: DeferredBehavior[Command] =>
+          findEventSourcedBehavior(deferred(context), context)
+
+        case _ => None
+      }
+    }
+
+    Behaviors.setup[Command] { context =>
+      findEventSourcedBehavior(behavior, context).fold {
+        throw new AssertionError("Did not find the expected 
EventSourcedBehavior")
+      } { esBehavior =>
+        val (initialState, initialSequenceNr) = 
fromStateAndSequenceNr.getOrElse(esBehavior.emptyState -> 0L)
+        new WrappedEventSourcedBehavior(context, esBehavior, initialState, 
initialSequenceNr, onEvent, onSnapshot)
+      }
+    }
+  }
+
+  def durableState[Command, State](behavior: Behavior[Command], fromState: 
Option[State])(
+      onPersist: (State, Long, String) => Unit): Behavior[Command] = {
+
+    @tailrec
+    def findDurableStateBehavior(
+        b: Behavior[Command],
+        context: ActorContext[Command]): 
Option[DurableStateBehaviorImpl[Command, State]] =
+      b match {
+        case ds: DurableStateBehaviorImpl[Command, _] =>
+          Some(ds.asInstanceOf[DurableStateBehaviorImpl[Command, State]])
+
+        case deferred: DeferredBehavior[Command] => 
findDurableStateBehavior(deferred(context), context)
+        case _                                   => None
+      }
+
+    Behaviors.setup[Command] { context =>
+      findDurableStateBehavior(behavior, context).fold {
+        throw new AssertionError("Did not find the expected 
DurableStateBehavior")
+      } { dsBehavior =>
+        val initialState = fromState.getOrElse(dsBehavior.emptyState)
+        new WrappedDurableStateBehavior(context, dsBehavior, initialState, 
onPersist)
+      }
+    }
+  }
+
+  private class WrappedEventSourcedBehavior[Command, Event, State](
+      context: ActorContext[Command],
+      esBehavior: EventSourcedBehaviorImpl[Command, Event, State],
+      initialState: State,
+      initialSequenceNr: Long,
+      onEvent: (Event, Long, Set[String]) => Unit,
+      onSnapshot: (State, Long) => Unit)
+      extends AbstractBehavior[Command](context)
+      with WithSeqNrAccessible {
+    import pekko.persistence.typed.{ EventSourcedSignal, RecoveryCompleted, 
SnapshotCompleted, SnapshotMetadata }
+    import pekko.persistence.typed.internal._
+
+    override def currentSequenceNumber: Long = sequenceNr
+
+    private def commandHandler = esBehavior.commandHandler
+    private def eventHandler = esBehavior.eventHandler
+    private def tagger = esBehavior.tagger
+    private def snapshotWhen = esBehavior.snapshotWhen
+    private def retention = esBehavior.retention
+    private def signalHandler = esBehavior.signalHandler
+
+    private var sequenceNr: Long = initialSequenceNr
+    private var state: State = initialState
+    private val stashedCommands = ListBuffer.empty[Command]
+
+    private def snapshotMetadata() =
+      SnapshotMetadata(esBehavior.persistenceId.toString, sequenceNr, 
System.currentTimeMillis())
+    private def sendSignal(signal: EventSourcedSignal): Unit =
+      signalHandler.applyOrElse(state -> signal, doNothing)
+
+    sendSignal(RecoveryCompleted)
+
+    override def onMessage(cmd: Command): Behavior[Command] = {
+      var shouldSnapshot = false
+      var shouldUnstash = false
+      var shouldStop = false
+
+      def snapshotRequested(evt: Event): Boolean = {
+        val snapshotFromRetention = retention match {
+          case DisabledRetentionCriteria             => false
+          case s: SnapshotCountRetentionCriteriaImpl => 
s.snapshotWhen(sequenceNr)
+          case unexpected                            => throw new 
IllegalStateException(s"Unexpected retention criteria: $unexpected")
+        }
+
+        snapshotFromRetention || snapshotWhen(state, evt, sequenceNr)
+      }
+
+      @tailrec
+      def applyEffects(curEffect: EffectImpl[Event, State], sideEffects: 
immutable.Seq[SideEffect[State]]): Unit =
+        curEffect match {
+          case CompositeEffect(eff: EffectImpl[Event, State], se) =>
+            applyEffects(eff, se ++ sideEffects)
+
+          case Persist(event) =>
+            sequenceNr += 1
+            state = eventHandler(state, event)
+            onEvent(event, sequenceNr, tagger(event))
+            shouldSnapshot = shouldSnapshot || snapshotRequested(event)
+            sideEffect(sideEffects)
+
+          case PersistAll(events) =>
+            val eventsWithSeqNrs =
+              events.map { event =>
+                sequenceNr += 1
+                state = eventHandler(state, event)
+                event -> sequenceNr
+              }
+
+            eventsWithSeqNrs.foreach {
+              case (event, seqNr) =>
+                // technically doesn't persist them atomically, but in tests 
that shouldn't matter
+                onEvent(event, seqNr, tagger(event))
+                shouldSnapshot = shouldSnapshot || snapshotRequested(event)
+            }
+
+            sideEffect(sideEffects)
+
+          // From outside of the behavior, these are equivalent: no state 
update
+          case _: PersistNothing.type | _: Unhandled.type =>
+            sideEffect(sideEffects)
+
+          case _: Stash.type =>
+            stashedCommands.append(cmd)
+            sideEffect(sideEffects)
+
+          case _ =>
+            context.log.error("Unexpected effect {}, stopping", curEffect)
+            Behaviors.stopped
+        }
+
+      def sideEffect(sideEffects: immutable.Seq[SideEffect[State]]): Unit =
+        sideEffects.iterator.foreach { effect =>
+          effect match {
+            case _: Stop.type       => shouldStop = true
+            case _: UnstashAll.type => shouldUnstash = true
+            case cb: Callback[_]    => cb.sideEffect(state)
+          }
+        }
+
+      applyEffects(commandHandler(state, cmd).asInstanceOf[EffectImpl[Event, 
State]], Nil)
+
+      if (shouldSnapshot) {
+        onSnapshot(state, sequenceNr)
+        sendSignal(SnapshotCompleted(snapshotMetadata()))
+      }
+
+      if (shouldStop) Behaviors.stopped
+      else if (shouldUnstash && stashedCommands.nonEmpty) {
+        val numStashed = stashedCommands.length
+        val thisWrappedBehavior = this
+
+        Behaviors.setup { _ =>
+          Behaviors.withStash(numStashed) { stash =>
+            stashedCommands.foreach { sc =>
+              stash.stash(sc)
+              ()
+            }
+
+            stashedCommands.remove(0, numStashed)
+            stash.unstashAll(thisWrappedBehavior)
+          }
+        }
+      } else this
+    }
+  }
+
+  private class WrappedDurableStateBehavior[Command, State](
+      context: ActorContext[Command],
+      dsBehavior: DurableStateBehaviorImpl[Command, State],
+      initialState: State,
+      onPersist: (State, Long, String) => Unit)
+      extends AbstractBehavior[Command](context)
+      with WithRevisionAccessible {
+
+    import pekko.persistence.typed.state.{ DurableStateSignal, 
RecoveryCompleted }
+    import pekko.persistence.typed.state.internal._
+
+    override def currentRevision: Long = sequenceNr
+
+    private def commandHandler = dsBehavior.commandHandler
+    private def signalHandler = dsBehavior.signalHandler
+    private val tag = dsBehavior.tag
+
+    private var sequenceNr: Long = 0
+    private var state: State = initialState
+    private val stashedCommands = ListBuffer.empty[Command]
+
+    private def sendSignal(signal: DurableStateSignal): Unit =
+      signalHandler.applyOrElse(state -> signal, doNothing)
+
+    sendSignal(RecoveryCompleted)
+
+    override def onMessage(cmd: Command): Behavior[Command] = {
+      var shouldUnstash = false
+      var shouldStop = false
+
+      def persistState(st: State): Unit = {
+        sequenceNr += 1
+        onPersist(st, sequenceNr, tag)
+        state = st
+      }
+
+      @tailrec
+      def applyEffects(curEffect: EffectImpl[State], sideEffects: 
immutable.Seq[SideEffect[State]]): Unit =
+        curEffect match {
+          case CompositeEffect(eff: EffectImpl[_], se) =>
+            applyEffects(eff.asInstanceOf[EffectImpl[State]], se ++ 
sideEffects)
+
+          case Persist(st) =>
+            persistState(st)
+            sideEffect(sideEffects)
+
+          case _: PersistNothing.type | _: Unhandled.type =>
+            sideEffect(sideEffects)
+
+          case _: Stash.type =>
+            stashedCommands.append(cmd)
+            sideEffect(sideEffects)
+
+          case _ =>
+            context.log.error("Unexpected effect, stopping")
+            Behaviors.stopped
+        }
+
+      def sideEffect(sideEffects: immutable.Seq[SideEffect[State]]): Unit =
+        sideEffects.iterator.foreach { effect =>
+          effect match {
+            case _: Stop.type       => shouldStop = true
+            case _: UnstashAll.type => shouldUnstash = true
+            case cb: Callback[_]    => cb.sideEffect(state)
+          }
+        }
+
+      applyEffects(commandHandler(state, cmd).asInstanceOf[EffectImpl[State]], 
Nil)
+
+      if (shouldStop) Behaviors.stopped
+      else if (shouldUnstash && stashedCommands.nonEmpty) {
+        val numStashed = stashedCommands.length
+        val thisWrappedBehavior = this
+
+        Behaviors.setup { _ =>
+          Behaviors.withStash(numStashed) { stash =>
+            stashedCommands.foreach { sc =>
+              stash.stash(sc)
+              () // explicit discard
+            }
+            stashedCommands.remove(0, numStashed)
+            stash.unstashAll(thisWrappedBehavior)
+          }
+        }
+      } else this
+    }
+  }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] class PersistenceProbeImpl[T] {
+  type Element = (T, Long, Set[String])
+
+  val queue = new ConcurrentLinkedQueue[Element]()
+
+  def persist(elem: Element): Unit = { queue.offer(elem); () }
+
+  def rawExtract(): Element =
+    queue.poll() match {
+      case null => throw new AssertionError("No persistence effects in probe")
+      case elem => elem
+    }
+
+  def asScala: scaladsl.PersistenceProbe[T] =
+    new scaladsl.PersistenceProbe[T] {
+      import scaladsl.{ PersistenceEffect, PersistenceProbe }
+
+      def drain(): Seq[PersistenceEffect[T]] = {
+        @annotation.tailrec
+        def iter(acc: List[PersistenceEffect[T]]): List[PersistenceEffect[T]] 
= {
+          val elem = queue.poll()
+          if (elem == null) acc else iter(persistenceEffect(elem) :: acc)
+        }
+
+        iter(Nil).reverse
+      }
+
+      def extract(): PersistenceEffect[T] = persistenceEffect(rawExtract())
+
+      def expectPersistedType[S <: T: ClassTag](): PersistenceEffect[S] =
+        rawExtract() match {
+          case (obj: S, sequenceNr, tags) => PersistenceEffect(obj, 
sequenceNr, tags)
+          case (extracted, _, _)          =>
+            throw new AssertionError(
+              s"Expected object of type 
[${implicitly[ClassTag[S]].runtimeClass.getName}] to be persisted, " +
+              s"but actual was of type [${extracted.getClass.getName}]")
+        }
+
+      def hasEffects: Boolean = !queue.isEmpty
+
+      def expectPersisted(obj: T): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, _) if obj == persistedObj => this
+          case (persistedObj, _, _)                        =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj]")
+        }
+
+      def expectPersisted(obj: T, tag: String): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, tags) if (obj == persistedObj) && (tags(tag)) 
=> this
+
+          case (persistedObj, _, tags) if obj == persistedObj =>
+            throw new AssertionError(
+              s"Expected persistence with tag [$tag], but actual tags were 
[${tags.mkString(",")}]")
+
+          case (persistedObj, _, tags) if tags(tag) =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj]")
+
+          case (persistedObj, _, tags) =>
+            throw new AssertionError(
+              s"Expected object [$obj] to be persisted with tag [$tag], " +
+              s"but actual object was [$persistedObj] with tags 
[${tags.mkString(",")}]")
+        }
+
+      def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, persistedTags) if (obj == persistedObj) && 
(tags == persistedTags) => this
+          case (persistedObj, _, persistedTags) if obj == persistedObj         
                     =>
+            val unexpected = persistedTags.diff(tags)
+            val notPersistedWith = tags.diff(persistedTags)
+
+            throw new AssertionError(
+              s"Expected persistence with [${tags.mkString(",")}], " +
+              s"but saw unexpected actual tags [${unexpected.mkString(",")}] 
and " +
+              s"did not see actual tags [${notPersistedWith.mkString(",")}]")
+
+          case (persistedObj, _, persistedTags) if tags == persistedTags =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj}]")
+
+          case (persistedObj, _, persistedTags) =>
+            throw new AssertionError(
+              s"Expected object [$obj] to be persisted with tags 
[${tags.mkString(",")}], " +
+              s"but actual object was [$persistedObj] with tags 
[${persistedTags.mkString(",")}]")
+        }
+
+      private def persistenceEffect(elem: Element): PersistenceEffect[T] =
+        PersistenceEffect(elem._1, elem._2, elem._3)
+    }
+
+  def asJava: javadsl.PersistenceProbe[T] =
+    new javadsl.PersistenceProbe[T] {
+      import java.util.{ List => JList, Set => JSet }
+
+      import javadsl.{ PersistenceEffect, PersistenceProbe }
+
+      def drain(): JList[PersistenceEffect[T]] = {
+        @annotation.tailrec
+        def iter(acc: List[PersistenceEffect[T]]): List[PersistenceEffect[T]] 
= {
+          val elem = queue.poll()
+          if (elem == null) acc else iter(persistenceEffect(elem) :: acc)
+        }
+        iter(Nil).reverse.asJava
+      }
+
+      def extract(): PersistenceEffect[T] = persistenceEffect(rawExtract())
+
+      def expectPersistedClass[S <: T](clazz: Class[S]): PersistenceEffect[S] =
+        rawExtract() match {
+          case (obj, sequenceNr, tags) if clazz.isInstance(obj) =>
+            PersistenceEffect(clazz.cast(obj), sequenceNr, tags.asJava)
+
+          case (extracted, _, _) =>
+            throw new AssertionError(
+              s"Expected object of type [${clazz.getName}] to be persisted, " +
+              s"but actual was of type [${extracted.getClass.getName}]")
+        }
+
+      def hasEffects: Boolean = !queue.isEmpty
+
+      def expectPersisted(obj: T): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, _) if obj == persistedObj => this
+          case (persistedObj, _, _)                        =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj]")
+        }
+
+      def expectPersisted(obj: T, tag: String): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, tags) if (obj == persistedObj) && (tags(tag)) 
=> this
+
+          case (persistedObj, _, tags) if obj == persistedObj =>
+            throw new AssertionError(
+              s"Expected persistence with tag [$tag], but actual tags were 
[${tags.mkString(",")}]")
+
+          case (persistedObj, _, tags) if tags(tag) =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj]")
+
+          case (persistedObj, _, tags) =>
+            throw new AssertionError(
+              s"Expected object [$obj] to be persisted with tag [$tag], " +
+              s"but actual object was [$persistedObj] with tags 
[${tags.mkString(",")}]")
+        }
+
+      def expectPersisted(obj: T, tags: JSet[String]): PersistenceProbe[T] = {
+        val sTags = tags.asScala
+
+        // Not sure if a Java Set after asScala-ing will compare equal to a 
Scala Set...
+        def sameTags(persistedTags: Set[String]): Boolean =
+          sTags.forall(persistedTags) && persistedTags.forall(sTags)
+
+        rawExtract() match {
+          case (persistedObj, _, persistedTags) if (obj == persistedObj) && 
sameTags(persistedTags) => this
+
+          case (persistedObj, _, persistedTags) if obj == persistedObj =>
+            val unexpected = persistedTags.diff(sTags)
+            val notPersistedWith = sTags.diff(persistedTags)
+
+            throw new AssertionError(
+              s"Expected persistence with [${sTags.mkString(",")}], " +
+              s"but saw unexpected actual tags [${unexpected.mkString(",")}] 
and " +
+              s"did not see actual tags [${notPersistedWith.mkString(",")}]")
+
+          case (persistedObj, _, persistedTags) if sameTags(persistedTags) =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj}]")
+
+          case (persistedObj, _, persistedTags) =>
+            throw new AssertionError(
+              s"Expected object [$obj] to be persisted with tags 
[${sTags.mkString(",")}], " +
+              s"but actual object was [$persistedObj] with tags 
[${persistedTags.mkString(",")}]")
+        }
+      }
+
+      private def persistenceEffect(element: Element): PersistenceEffect[T] =
+        PersistenceEffect(element._1, element._2, element._3.asJava)
+    }
+}
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
new file mode 100644
index 0000000000..71c78195fc
--- /dev/null
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.javadsl
+
+import java.util.{ List, Set }
+
+import scala.collection.immutable.{ Set => ScalaSet }
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.javadsl.BehaviorTestKit
+import pekko.actor.typed.Behavior
+import pekko.annotation.DoNotInherit
+import pekko.persistence.testkit.internal.{ PersistenceProbeImpl, Unpersistent 
}
+
+/**
+ * Factory methods to create UnpersistentBehavior instances for testing.
+ *
+ * @since 1.3.0
+ */
+object UnpersistentBehavior {
+
+  /**
+   * Given an EventSourcedBehavior, produce a non-persistent Behavior which 
synchronously publishes events and snapshots
+   *  for inspection.  State is updated as in the EventSourcedBehavior, and 
side effects are performed synchronously.  The
+   *  resulting Behavior is, contingent on the command handling, event 
handling, and side effects being compatible with the
+   *  BehaviorTestKit, testable with the BehaviorTestKit.
+   *
+   *  The returned Behavior does not intrinsically depend on configuration: it 
therefore does not serialize and
+   *  assumes an unbounded stash for commands.
+   *
+   *  @param behavior a (possibly wrapped) EventSourcedBehavior to serve as 
the basis for the unpersistent behavior
+   *  @param initialState start the unpersistent behavior with this state; if 
null, behavior's initialState will be used
+   *  @param initialSequenceNr start the unpersistent behavior with this 
sequence number; only applies if initialState is non-null
+   *  @return an UnpersistentBehavior based on an EventSourcedBehavior
+   */
+  def fromEventSourced[Command, Event, State](
+      behavior: Behavior[Command],
+      initialState: State,
+      initialSequenceNr: Long): UnpersistentBehavior[Command, Event, State] = {
+    require(initialSequenceNr >= 0, "initialSequenceNr must be at least zero")
+
+    val initialStateAndSequenceNr = Option(initialState).map(_ -> 
initialSequenceNr)
+    val eventProbe = new PersistenceProbeImpl[Event]
+    val snapshotProbe = new PersistenceProbeImpl[State]
+
+    val b =
+      Unpersistent.eventSourced(behavior, initialStateAndSequenceNr) {
+        (event: Event, sequenceNr: Long, tags: ScalaSet[String]) =>
+          eventProbe.persist((event, sequenceNr, tags))
+      } { (snapshot, sequenceNr) =>
+        snapshotProbe.persist((snapshot, sequenceNr, ScalaSet.empty))
+      }
+
+    new UnpersistentBehavior(b, eventProbe.asJava, snapshotProbe.asJava)
+  }
+
+  def fromEventSourced[Command, Event, State](
+      behavior: Behavior[Command]): UnpersistentBehavior[Command, Event, 
State] =
+    fromEventSourced(behavior, null.asInstanceOf[State], 0)
+
+  def fromDurableState[Command, State](
+      behavior: Behavior[Command],
+      initialState: State): UnpersistentBehavior[Command, Void, State] = {
+    val probe = new PersistenceProbeImpl[State]
+    val b =
+      Unpersistent.durableState(behavior, Option(initialState)) { (state, 
version, tag) =>
+        probe.persist((state, version, if (tag == "") ScalaSet.empty else 
ScalaSet(tag)))
+      }
+
+    new UnpersistentBehavior(b, noEventProbe, probe.asJava)
+  }
+
+  def fromDurableState[Command, State](behavior: Behavior[Command]): 
UnpersistentBehavior[Command, Void, State] =
+    fromDurableState(behavior, null.asInstanceOf[State])
+
+  private val noEventProbe: PersistenceProbe[Void] =
+    new PersistenceProbe[Void] {
+      def drain(): List[PersistenceEffect[Void]] =
+        // could return an empty list, but the intent is that any use of this 
probe should fail the test
+        boom()
+
+      def extract(): PersistenceEffect[Void] = boom()
+      def expectPersistedClass[S <: Void](clazz: Class[S]): 
PersistenceEffect[S] = boom()
+      def hasEffects: Boolean = boom()
+      def expectPersisted(obj: Void): PersistenceProbe[Void] = boom()
+      def expectPersisted(obj: Void, tag: String): PersistenceProbe[Void] = 
boom()
+      def expectPersisted(obj: Void, tags: Set[String]): 
PersistenceProbe[Void] = boom()
+
+      private def boom() = throw new AssertionError("No events were persisted")
+    }
+}
+
+final class UnpersistentBehavior[Command, Event, State] private (
+    behavior: Behavior[Command],
+    eventProbe: PersistenceProbe[Event],
+    stateProbe: PersistenceProbe[State]) {
+  def getBehavior(): Behavior[Command] = behavior
+  def getBehaviorTestKit(): BehaviorTestKit[Command] = btk
+
+  /** Note: durable state behaviors will not publish events to this probe */
+  def getEventProbe(): PersistenceProbe[Event] = eventProbe
+
+  def getStateProbe(): PersistenceProbe[State] = stateProbe
+  def getSnapshotProbe(): PersistenceProbe[State] = stateProbe
+
+  private lazy val btk = BehaviorTestKit.create(behavior)
+}
+
+final case class PersistenceEffect[T](persistedObject: T, sequenceNr: Long, 
tags: Set[String])
+
+/**
+ * Not for user extension
+ */
+@DoNotInherit
+trait PersistenceProbe[T] {
+
+  /** Collect all persistence effects from the probe and empty the probe */
+  def drain(): List[PersistenceEffect[T]]
+
+  /** Get and remove the oldest persistence effect from the probe */
+  def extract(): PersistenceEffect[T]
+
+  /**
+   * Get and remove the oldest persistence effect from the probe, failing if 
the
+   *  persisted object is not of the requested type
+   */
+  def expectPersistedClass[S <: T](clazz: Class[S]): PersistenceEffect[S]
+
+  /** Are there any persistence effects */
+  def hasEffects: Boolean
+
+  /**
+   * Assert that the given object was persisted in the oldest persistence 
effect and
+   *  remove that persistence effect
+   */
+  def expectPersisted(obj: T): PersistenceProbe[T]
+
+  /**
+   * Assert that the given object was persisted with the given tag in the 
oldest persistence
+   *  effect and remove that persistence effect.  If the persistence effect 
has multiple tags,
+   *  only one of them has to match in order for the assertion to succeed.
+   */
+  def expectPersisted(obj: T, tag: String): PersistenceProbe[T]
+
+  /**
+   * Assert that the given object was persisted with the given tag in the 
oldest persistence
+   *  effect and remove that persistence effect.  If the persistence effect 
has tags which are
+   *  not given, the assertion fails.
+   */
+  def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T]
+}
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
new file mode 100644
index 0000000000..72405f0e27
--- /dev/null
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import scala.reflect.ClassTag
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.scaladsl.BehaviorTestKit
+import pekko.actor.typed.Behavior
+import pekko.annotation.DoNotInherit
+import pekko.persistence.testkit.internal.{ PersistenceProbeImpl, Unpersistent 
}
+
+sealed trait UnpersistentBehavior[Command, State] {
+  val behavior: Behavior[Command]
+  lazy val behaviorTestKit = BehaviorTestKit(behavior)
+
+  def stateProbe: PersistenceProbe[State]
+}
+
+/**
+ * Factory methods to create UnpersistentBehavior instances for testing.
+ *
+ * @since 1.3.0
+ */
+object UnpersistentBehavior {
+
+  /**
+   * Given an EventSourcedBehavior, produce a non-persistent Behavior which 
synchronously publishes events and snapshots
+   *  for inspection.  State is updated as in the EventSourcedBehavior, and 
side effects are performed synchronously.  The
+   *  resulting Behavior is, contingent on the command handling, event 
handling, and side effects being compatible with the
+   *  BehaviorTestKit, testable with the BehaviorTestKit.
+   *
+   *  The returned Behavior does not intrinsically depend on configuration: it 
therefore does not serialize and assumes an
+   *  unbounded stash for commands.
+   */
+  def fromEventSourced[Command, Event, State](
+      behavior: Behavior[Command],
+      initialStateAndSequenceNr: Option[(State, Long)] = None): 
EventSourced[Command, Event, State] = {
+    val eventProbe = new PersistenceProbeImpl[Event]
+    val snapshotProbe = new PersistenceProbeImpl[State]
+    val resultingBehavior =
+      Unpersistent.eventSourced(behavior, initialStateAndSequenceNr) {
+        (event: Event, sequenceNr: Long, tags: Set[String]) =>
+          eventProbe.persist((event, sequenceNr, tags))
+      } { (snapshot, sequenceNr) =>
+        snapshotProbe.persist((snapshot, sequenceNr, Set.empty))
+      }
+
+    EventSourced(resultingBehavior, eventProbe.asScala, snapshotProbe.asScala)
+  }
+
+  def fromEventSourced[Command, Event, State](
+      behavior: Behavior[Command],
+      initialState: State): EventSourced[Command, Event, State] =
+    fromEventSourced(behavior, Some(initialState -> 0L))
+
+  def fromDurableState[Command, State](
+      behavior: Behavior[Command],
+      initialState: Option[State] = None): DurableState[Command, State] = {
+    val probe = new PersistenceProbeImpl[State]
+
+    val resultingBehavior =
+      Unpersistent.durableState(behavior, initialState) { (state, version, 
tag) =>
+        probe.persist((state, version, if (tag.isEmpty) Set.empty else 
Set(tag)))
+      }
+
+    DurableState(resultingBehavior, probe.asScala)
+  }
+
+  final case class EventSourced[Command, Event, State](
+      override val behavior: Behavior[Command],
+      val eventProbe: PersistenceProbe[Event],
+      override val stateProbe: PersistenceProbe[State])
+      extends UnpersistentBehavior[Command, State] {
+    def apply(f: (BehaviorTestKit[Command], PersistenceProbe[Event], 
PersistenceProbe[State]) => Unit): Unit =
+      f(behaviorTestKit, eventProbe, stateProbe)
+
+    def snapshotProbe: PersistenceProbe[State] = stateProbe
+  }
+
+  final case class DurableState[Command, State](
+      override val behavior: Behavior[Command],
+      override val stateProbe: PersistenceProbe[State])
+      extends UnpersistentBehavior[Command, State] {
+    def apply(f: (BehaviorTestKit[Command], PersistenceProbe[State]) => Unit): 
Unit =
+      f(behaviorTestKit, stateProbe)
+  }
+}
+
+final case class PersistenceEffect[T](persistedObject: T, sequenceNr: Long, 
tags: Set[String])
+
+/**
+ * Not for user extension
+ */
+@DoNotInherit
+trait PersistenceProbe[T] {
+
+  /** Collect all persistence effects from the probe and empty the probe */
+  def drain(): Seq[PersistenceEffect[T]]
+
+  /** Get and remove the oldest persistence effect from the probe */
+  def extract(): PersistenceEffect[T]
+
+  /**
+   * Get and remove the oldest persistence effect from the probe, failing if 
the
+   *  persisted object is not of the requested type
+   */
+  def expectPersistedType[S <: T: ClassTag](): PersistenceEffect[S]
+
+  /** Are there any persistence effects? */
+  def hasEffects: Boolean
+
+  /**
+   * Assert that the given object was persisted in the oldest persistence 
effect and
+   *  remove that persistence effect
+   */
+  def expectPersisted(obj: T): PersistenceProbe[T]
+
+  /**
+   * Assert that the given object was persisted with the given tag in the 
oldest
+   *  persistence effect and remove that persistence effect.  If the 
persistence
+   *  effect has multiple tags, only one of them has to match in order for the
+   *  assertion to succeed.
+   */
+  def expectPersisted(obj: T, tag: String): PersistenceProbe[T]
+
+  /**
+   * Assert that the given object was persisted with the given tags in the 
oldest
+   *  persistence effect and remove that persistence effect.  If the 
persistence
+   *  effect has tags which are not given, the assertion fails.
+   */
+  def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T]
+}
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
new file mode 100644
index 0000000000..33efc52814
--- /dev/null
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.{ Behavior, RecipientRef }
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.state.RecoveryCompleted
+import pekko.persistence.typed.state.scaladsl._
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+object UnpersistentDurableStateSpec {
+  object BehaviorUnderTest {
+    sealed trait Command
+
+    case class Add(n: Int, replyTo: RecipientRef[Done]) extends Command
+    case class AddIfLessThan(toAdd: Int, ifLessThan: Int, replyTo: 
RecipientRef[Boolean]) extends Command
+    case class AddWhenAtLeast(toAdd: Int, whenAtLeast: Int, replyTo: 
RecipientRef[Done]) extends Command
+    case class NotifyIfAtLeast(n: Int, notifyTo: RecipientRef[Done], replyTo: 
RecipientRef[Boolean]) extends Command
+    case class GetRevisionNumber(replyTo: RecipientRef[Long]) extends Command
+
+    case class State(count: Int, notifyAfter: Map[Int, RecipientRef[Done]], 
nextNotifyAt: Int) {
+      def processAdd(n: Int): State = {
+        val nextCount = count + n
+
+        if (nextCount < nextNotifyAt) copy(count = nextCount)
+        else {
+          import scala.collection.mutable
+
+          val (nextNNA, nextNotifyAfter) = {
+            var lowestNotifyAt = Int.MaxValue
+            val inProgress = mutable.Map.empty[Int, RecipientRef[Done]]
+
+            notifyAfter.keysIterator.foreach { at =>
+              if (at > nextCount) {
+                lowestNotifyAt = lowestNotifyAt.min(at)
+                inProgress += (at -> notifyAfter(at))
+              }
+            }
+
+            lowestNotifyAt -> inProgress.toMap
+          }
+
+          copy(count = nextCount, notifyAfter = nextNotifyAfter, nextNotifyAt 
= nextNNA)
+        }
+      }
+
+      def addObserver(at: Int, notifyTo: RecipientRef[Done]): State = {
+        val nextNNA = nextNotifyAt.min(at)
+        val nextNotifyAfter = notifyAfter.updated(at, notifyTo)
+
+        copy(notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA)
+      }
+    }
+
+    def apply(id: String, recoveryDone: RecipientRef[Done]): Behavior[Command] 
=
+      Behaviors.setup { context =>
+        context.setLoggerName(s"entity-$id")
+
+        DurableStateBehavior[Command, State](
+          persistenceId = PersistenceId.ofUniqueId(id),
+          emptyState = State(0, Map.empty, Int.MaxValue),
+          commandHandler = applyCommand(_, _, context))
+          .receiveSignal {
+            case (_, RecoveryCompleted) =>
+              recoveryDone ! Done
+          }
+          .withTag("count")
+      }
+
+    private def applyCommand(state: State, cmd: Command, context: 
ActorContext[Command]): Effect[State] = {
+      def persistAdd[Reply](n: Int, replyTo: RecipientRef[Reply], reply: 
Reply): Effect[State] = {
+        val newState = state.processAdd(n)
+
+        Effect
+          .persist(newState)
+          .thenRun { nextState => // should be the same as newState, but...
+            state.notifyAfter.keysIterator
+              .filter { at =>
+                (at <= nextState.nextNotifyAt) && 
!nextState.notifyAfter.isDefinedAt(at)
+              }
+              .foreach { at =>
+                state.notifyAfter(at) ! Done
+              }
+
+            replyTo ! reply
+          }
+          .thenUnstashAll()
+      }
+
+      cmd match {
+        case Add(n, replyTo) => persistAdd(n, replyTo, Done)
+
+        case AddIfLessThan(toAdd, ifLessThan, replyTo) =>
+          if (state.count >= ifLessThan) {
+            context.log.info("Rejecting AddIfLessThan as count = {}", 
state.count)
+            Effect.none[State].thenRun(_ => replyTo ! false)
+          } else persistAdd(toAdd, replyTo, true)
+
+        case AddWhenAtLeast(toAdd, whenAtLeast, replyTo) =>
+          if (state.count < whenAtLeast) Effect.stash()
+          else persistAdd(toAdd, replyTo, Done)
+
+        case NotifyIfAtLeast(n, notifyTo, replyTo) =>
+          if (state.count >= n) {
+            Effect.none[State].thenRun { _ =>
+              notifyTo ! Done
+              replyTo ! true
+            }
+          } else if (state.notifyAfter.isDefinedAt(n)) {
+            Effect.none[State].thenRun(_ => replyTo ! false)
+          } else {
+            Effect.persist(state.addObserver(n, notifyTo)).thenRun(_ => 
replyTo ! true)
+          }
+
+        case GetRevisionNumber(replyTo) =>
+          Effect.none[State].thenRun(_ => replyTo ! 
DurableStateBehavior.lastSequenceNumber(context))
+      }
+    }
+  }
+}
+
+class UnpersistentDurableStateSpec extends AnyWordSpec with Matchers {
+  import UnpersistentDurableStateSpec._
+
+  import pekko.actor.testkit.typed.scaladsl._
+
+  import org.slf4j.event.Level
+
+  "Unpersistent DurableStateBehavior" must {
+    "generate a fail-fast behavior from a non-DurableStateBehavior" in {
+      val notDurableState =
+        Behaviors.receive[Any] { (context, msg) =>
+          context.log.info("Got message {}", msg)
+          Behaviors.same
+        }
+
+      val unpersistent = UnpersistentBehavior.fromDurableState[Any, 
Any](notDurableState)
+      an[AssertionError] shouldBe thrownBy { unpersistent.behaviorTestKit }
+      assert(!unpersistent.stateProbe.hasEffects, "should be no persistence 
effects")
+    }
+
+    "generate a Behavior from a DurableStateBehavior and process 
RecoveryCompleted" in {
+      import BehaviorUnderTest._
+
+      val recoveryDone = TestInbox[Done]()
+      val behavior = BehaviorUnderTest("test-1", recoveryDone.ref)
+
+      // accessor-style API
+      val unpersistent = UnpersistentBehavior.fromDurableState[Command, 
State](behavior)
+      val probe = unpersistent.stateProbe
+      val testkit = unpersistent.behaviorTestKit
+
+      assert(!probe.hasEffects, "should not be persistence yet")
+      recoveryDone.expectMessage(Done)
+      val logs = testkit.logEntries()
+      logs.size shouldBe 1
+      logs.head.level shouldBe Level.DEBUG
+      logs.head.message shouldBe s"Recovered state for id [test-1] is 
[${State(0, Map.empty, Int.MaxValue)}]"
+    }
+
+    "publish state changes in response to commands" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val replyTo = TestInbox[Done]()
+
+      // and the more functional-style API
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        testkit.run(Add(1, replyTo.ref))
+        replyTo.expectMessage(Done)
+        probe.expectPersisted(State(1, Map.empty, Int.MaxValue), tag = "count")
+        assert(!testkit.hasEffects(), "should have no actor effects")
+      }
+    }
+
+    "allow a state to be injected" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val notify3 = TestInbox[Done]()
+      val initialState = State(1, Map(3 -> notify3.ref), 3)
+
+      UnpersistentBehavior.fromDurableState[Command, State](behavior, 
Some(initialState)) { (testkit, probe) =>
+        val logs = testkit.logEntries()
+
+        logs.size shouldBe 1
+        logs.head.level shouldBe Level.DEBUG
+        logs.head.message shouldBe s"Recovered state for id [test-1] is 
[$initialState]"
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!notify3.hasMessages, "no messages should be sent to notify3")
+
+        val replyTo = TestInbox[Done]()
+        testkit.run(AddWhenAtLeast(2, 2, replyTo.ref))
+        assert(!replyTo.hasMessages, "no messages should be sent now")
+        assert(!notify3.hasMessages, "no messages should be sent to notify3")
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!testkit.hasEffects(), "should be no testkit effects")
+
+        testkit.run(Add(3, TestInbox[Done]().ref))
+        replyTo.expectMessage(Done)
+        notify3.expectMessage(Done)
+        assert(!testkit.hasEffects(), "should be no testkit effects")
+        probe.drain() should contain theSameElementsInOrderAs Seq(
+          PersistenceEffect(State(4, Map.empty, Int.MaxValue), 1, 
Set("count")),
+          PersistenceEffect(State(6, Map.empty, Int.MaxValue), 2, 
Set("count")))
+      }
+    }
+
+    "stash and unstash properly" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val replyTo1 = TestInbox[Done]()
+      val add = Add(1, TestInbox[Done]().ref)
+
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        // stashes
+        testkit.run(AddWhenAtLeast(1, 1, replyTo1.ref))
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!replyTo1.hasMessages, "count is not yet 1")
+
+        // unstashes
+        testkit.run(add)
+        replyTo1.expectMessage(Done)
+        probe.drain() shouldNot be(empty)
+
+        // unstash but nothing in the stash
+        testkit.run(add)
+        assert(!replyTo1.hasMessages, "should not send again")
+        probe.drain() shouldNot be(empty)
+      }
+    }
+
+    "retrieve revision number" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+
+      val replyTo = TestInbox[Long]()
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        testkit.run(GetRevisionNumber(replyTo.ref))
+        (the[AssertionError] thrownBy (probe.extract())).getMessage shouldBe 
"No persistence effects in probe"
+        replyTo.expectMessage(0)
+      }
+    }
+  }
+}
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
new file mode 100644
index 0000000000..900aea4b4c
--- /dev/null
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.{ Behavior, RecipientRef }
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import pekko.persistence.typed.{ PersistenceId, RecoveryCompleted }
+import pekko.persistence.typed.scaladsl._
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+object UnpersistentEventSourcedSpec {
+  object BehaviorUnderTest {
+    sealed trait Command
+
+    case class PersistDomainEvent(replyTo: RecipientRef[Done]) extends Command
+    case class PersistDomainEventIfBefore(count: Int, replyTo: 
RecipientRef[Boolean]) extends Command
+    case class PersistDomainEventAfter(count: Int, replyTo: 
RecipientRef[Done]) extends Command
+    case class NotifyAfter(count: Int, notifyTo: RecipientRef[Done], replyTo: 
RecipientRef[Boolean]) extends Command
+    case class GetSequenceNumber(replyTo: RecipientRef[Long]) extends Command
+    case object SnapshotNow extends Command
+
+    sealed trait Event
+    case object DomainEvent extends Event
+    case object SnapshotMade extends Event
+    case class ObserverAdded(count: Int, notifyTo: RecipientRef[Done]) extends 
Event
+
+    case class State(domainEvtCount: Int, notifyAfter: Map[Int, 
RecipientRef[Done]], nextNotifyAt: Int)
+
+    val initialState = State(0, Map.empty, Int.MaxValue)
+
+    def apply(id: String, recoveryDone: RecipientRef[Done]): Behavior[Command] 
=
+      Behaviors.setup { context =>
+        context.setLoggerName(s"entity-$id")
+
+        EventSourcedBehavior[Command, Event, State](
+          persistenceId = PersistenceId.ofUniqueId(id),
+          emptyState = initialState,
+          commandHandler = applyCommand(_, _, context),
+          eventHandler = applyEvent(_, _))
+          .receiveSignal {
+            case (_, RecoveryCompleted) =>
+              recoveryDone ! Done
+          }
+          .snapshotWhen {
+            case (_, SnapshotMade, _) => true
+            case _                    => false
+          }
+          .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, 
keepNSnapshots = 2))
+          .withTagger {
+            case DomainEvent => Set("domain")
+            case _           => Set.empty
+          }
+      }
+
+    private def applyCommand(state: State, cmd: Command, context: 
ActorContext[Command]): Effect[Event, State] = {
+      def persistDomainEvent[Reply](replyTo: RecipientRef[Reply], reply: 
Reply): Effect[Event, State] =
+        Effect
+          .persist[Event, State](DomainEvent)
+          .thenRun { newState =>
+            state.notifyAfter.keysIterator
+              .filter { at =>
+                (at <= newState.nextNotifyAt) && 
!newState.notifyAfter.isDefinedAt(at)
+              }
+              .foreach { at =>
+                state.notifyAfter(at) ! Done
+              }
+
+            replyTo ! reply
+          }
+          .thenUnstashAll()
+
+      cmd match {
+        case PersistDomainEvent(replyTo) => persistDomainEvent(replyTo, Done)
+
+        case PersistDomainEventIfBefore(count, replyTo) =>
+          if (state.domainEvtCount >= count) {
+            context.log.info("Rejecting PersistDomainEventIfBefore as 
domainEvtCount = {}", state.domainEvtCount)
+            Effect.none.thenRun(_ => replyTo ! false)
+          } else persistDomainEvent(replyTo, true)
+
+        case PersistDomainEventAfter(count, replyTo) =>
+          if (state.domainEvtCount < count) Effect.stash()
+          else persistDomainEvent(replyTo, Done)
+
+        case NotifyAfter(count, notifyTo, replyTo) =>
+          if (state.domainEvtCount >= count) {
+            Effect.none.thenRun { _ =>
+              notifyTo ! Done
+              replyTo ! true
+            }
+          } else if (state.notifyAfter.isDefinedAt(count)) {
+            Effect.none.thenRun(_ => replyTo ! false)
+          } else {
+            Effect.persist(ObserverAdded(count, notifyTo)).thenRun(_ => 
replyTo ! true)
+          }
+
+        case SnapshotNow => Effect.persist(SnapshotMade)
+
+        case GetSequenceNumber(replyTo) =>
+          Effect.none.thenRun(_ => replyTo ! 
EventSourcedBehavior.lastSequenceNumber(context))
+      }
+    }
+
+    private[scaladsl] def applyEvent(state: State, evt: Event): State =
+      evt match {
+        case DomainEvent =>
+          val nextDomainEvtCount = state.domainEvtCount + 1
+
+          if (nextDomainEvtCount < state.nextNotifyAt) 
state.copy(domainEvtCount = nextDomainEvtCount)
+          else {
+            import scala.collection.mutable
+
+            val (nextNNA, nextNotifyAfter) = {
+              var lowestNotifyAt = Int.MaxValue
+              val inProgress = mutable.Map.empty[Int, RecipientRef[Done]]
+
+              state.notifyAfter.keysIterator.foreach { at =>
+                if (at > nextDomainEvtCount) {
+                  lowestNotifyAt = lowestNotifyAt.min(at)
+                  inProgress += (at -> state.notifyAfter(at))
+                }
+                // else ()
+              }
+
+              lowestNotifyAt -> inProgress.toMap
+            }
+
+            state.copy(domainEvtCount = nextDomainEvtCount, notifyAfter = 
nextNotifyAfter, nextNotifyAt = nextNNA)
+          }
+
+        case SnapshotMade                   => state
+        case ObserverAdded(count, notifyTo) =>
+          val nextNNA = state.nextNotifyAt.min(count)
+          val nextNotifyAfter = state.notifyAfter.updated(count, notifyTo)
+
+          state.copy(notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA)
+      }
+  }
+}
+
+class UnpersistentEventSourcedSpec extends AnyWordSpec with Matchers {
+  import UnpersistentEventSourcedSpec._
+
+  import pekko.actor.testkit.typed.scaladsl._
+
+  import org.slf4j.event.Level
+
+  "Unpersistent EventSourcedBehavior" must {
+    "generate a failing behavior from a non-EventSourcedBehavior" in {
+      val notEventSourced =
+        Behaviors.receive[Any] { (context, msg) =>
+          context.log.info("Got message {}", msg)
+          Behaviors.same
+        }
+
+      val unpersistent = UnpersistentBehavior.fromEventSourced[Any, Any, 
Any](notEventSourced)
+      an[AssertionError] shouldBe thrownBy { unpersistent.behaviorTestKit }
+      an[AssertionError] shouldBe thrownBy { unpersistent.eventProbe.extract() 
}
+      an[AssertionError] shouldBe thrownBy { 
unpersistent.snapshotProbe.extract() }
+    }
+
+    "generate a Behavior from an EventSourcedBehavior and process 
RecoveryCompleted" in {
+      import BehaviorUnderTest._
+
+      val recoveryDone = TestInbox[Done]()
+      val behavior = BehaviorUnderTest("test-1", recoveryDone.ref)
+
+      // accessor API
+      val unpersistent = UnpersistentBehavior.fromEventSourced[Command, Event, 
State](behavior)
+      val testkit = unpersistent.behaviorTestKit
+      val eventProbe = unpersistent.eventProbe
+      val snapshotProbe = unpersistent.snapshotProbe
+
+      assert(!eventProbe.hasEffects, "should not be events")
+      assert(!snapshotProbe.hasEffects, "should not be snapshots")
+      recoveryDone.expectMessage(Done)
+      val logs = testkit.logEntries()
+      logs.size shouldBe 1
+      logs.head.level shouldBe Level.DEBUG
+      logs.head.message shouldBe s"Recovered state for id [test-1] is 
[${State(0, Map.empty, Int.MaxValue)}]"
+    }
+
+    "publish events and evolve observed state in response to commands" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val replyTo = TestInbox[Done]()
+
+      // resource-style API
+      UnpersistentBehavior.fromEventSourced[Command, Event, State](behavior) { 
(testkit, eventProbe, snapshotProbe) =>
+        testkit.clearLog()
+
+        testkit.run(PersistDomainEvent(replyTo.ref))
+        replyTo.expectMessage(Done)
+        eventProbe.expectPersisted(DomainEvent, Set("domain"))
+        snapshotProbe.drain() shouldBe empty
+        assert(!testkit.hasEffects(), "should have no effects")
+        testkit.clearLog()
+
+        testkit.run(SnapshotNow)
+        assert(!replyTo.hasMessages, "should not be a reply")
+
+        val PersistenceEffect(_, seqNr, tags) = 
eventProbe.expectPersistedType[SnapshotMade.type]()
+        seqNr shouldBe 2
+        tags shouldBe empty
+
+        snapshotProbe.expectPersisted(State(1, Map.empty, Int.MaxValue))
+      }
+    }
+
+    "not publish events if the event handler fails" in {
+      val behavior =
+        EventSourcedBehavior[Int, Int, Unit](
+          persistenceId = PersistenceId.ofUniqueId("degenerator"),
+          emptyState = (),
+          commandHandler = { (_, cmd: Int) =>
+            cmd match {
+              case nothing if nothing < 1 => Effect.none
+              case 1                      => Effect.persist(1)
+              case n                      => Effect.persist(Range(n, 0, -1)) 
// count down to 1
+            }
+          },
+          eventHandler = { (_, evt) =>
+            if (evt == 1) throw new RuntimeException("Kaboom!")
+          })
+
+      UnpersistentBehavior.fromEventSourced[Int, Int, Unit](behavior) { 
(testkit, eventProbe, _) =>
+        val oneException = the[RuntimeException] thrownBy testkit.run(1)
+        oneException.getMessage shouldBe "Kaboom!"
+        eventProbe.hasEffects shouldBe false
+
+        val twoException = the[RuntimeException] thrownBy testkit.run(2)
+        twoException.getMessage shouldBe "Kaboom!"
+        eventProbe.hasEffects shouldBe false
+      }
+    }
+
+    "allow a state and starting offset to be injected" in {
+      import BehaviorUnderTest._
+
+      val recoveryDone = TestInbox[Done]()
+      val behavior = BehaviorUnderTest("test-1", recoveryDone.ref)
+
+      val notify3 = TestInbox[Done]()
+      val initialState =
+        Seq(ObserverAdded(3, notify3.ref), SnapshotMade, DomainEvent, 
DomainEvent)
+          .foldLeft(State(0, Map.empty, Int.MaxValue))(applyEvent _)
+
+      UnpersistentBehavior.fromEventSourced[Command, Event, State](behavior, 
Some(initialState -> 41L)) {
+        (testkit, eventProbe, snapshotProbe) =>
+          recoveryDone.expectMessage(Done)
+          val logs = testkit.logEntries()
+          logs.size shouldBe 1
+          logs.head.level shouldBe Level.DEBUG
+          logs.head.message shouldBe s"Recovered state for id [test-1] is 
[$initialState]"
+          eventProbe.drain() shouldBe empty
+          snapshotProbe.drain() shouldBe empty
+          assert(!notify3.hasMessages, "no messages should be sent to notify3")
+
+          val replyTo = TestInbox[Done]()
+          testkit.run(PersistDomainEventAfter(2, replyTo.ref))
+          assert(!testkit.hasEffects(), "should be no testkit effects")
+          eventProbe.extract() shouldBe PersistenceEffect(DomainEvent, 42, 
Set("domain"))
+          snapshotProbe.expectPersisted(State(3, Map.empty, Int.MaxValue))
+
+          notify3.expectMessage(Done)
+          replyTo.expectMessage(Done)
+      }
+    }
+
+    "stash and unstash properly" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+
+      UnpersistentBehavior.fromEventSourced[Command, Event, State](behavior, 
None) {
+        (testkit, eventProbe, snapshotProbe) =>
+          val replyTo1 = TestInbox[Done]()
+          val pde = PersistDomainEvent(TestInbox[Done]().ref)
+
+          // stashes
+          testkit.run(PersistDomainEventAfter(1, replyTo1.ref))
+          eventProbe.drain() shouldBe empty
+          snapshotProbe.drain() shouldBe empty
+          assert(!replyTo1.hasMessages, "have not persisted first domain 
event")
+
+          // unstashes
+          testkit.run(pde)
+          replyTo1.expectMessage(Done)
+
+          // unstash, but nothing in the stash
+          testkit.run(pde)
+          assert(!replyTo1.hasMessages, "should not send again")
+      }
+    }
+
+    "retrieve sequence number properly" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+
+      val randomStartingOffset =
+        scala.util.Random.nextLong() match {
+          case Long.MinValue => Long.MaxValue
+          case x if x < 0    => -x
+          case x             => x
+        }
+
+      UnpersistentBehavior.fromEventSourced[Command, Event, State](behavior,
+        Some(initialState -> randomStartingOffset)) {
+        (testkit, eventProbe, snapshotProbe) =>
+          val replyTo = TestInbox[Long]()
+
+          testkit.run(GetSequenceNumber(replyTo.ref))
+          eventProbe.drain() shouldBe empty
+          snapshotProbe.drain() shouldBe empty
+          replyTo.expectMessage(randomStartingOffset)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to