This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new d89f2eec8b Fix typed persistence stack overflow with many read only 
commands (#1919)
d89f2eec8b is described below

commit d89f2eec8b1cf778869a0e19bf7c77a8e0720565
Author: Tomassino-ibm <tomassino.ferra...@ibm.com>
AuthorDate: Fri Jul 11 11:32:43 2025 +0200

    Fix typed persistence stack overflow with many read only commands (#1919)
    
    * Update unstash stack overflow test to have it actually fail
    
    Taken changes from https://github.com/apache/pekko/pull/1336 to have a test 
that fails
    
    * Fix possible stack overflow in persistence-typed
    
    This commit adds code to break recurrent calls in persistence-typed while 
unstashing read-only commands that could lead to a stack overflow, fixing issue 
#1327 (limited to EventSourcedBehavior)
    
    The fix can be enabled using a feature flag, by default it is disabled
    
    * bin compat exclude and scalafmt
    
    * Also fix the same stack overflow issue in DurableStateBehavior
    
    The fix is enabled by the same feature flag used by the fix of 
EventSourcedBehavior
    
    * Enable by default the fix for the stack overflow
    
    Also rename parameter
    
    * Refactor code to make it more explicit that the old code path is unchanged
    
    This commit changes how `onMessage` and `onCommand` are implemented to make 
it clearer that, when the `recurse-when-unstashing-read-only-commands` flag is 
set to true, the old code path is used.
    
    Moreover, the while loop in onCommand has been changed into a tail 
recursive function
    
    These changes have been applied to both EventSourcedBehavior and 
DurableStateBehavior
    
    ---------
    
    Co-authored-by: PJ Fanning <pjfann...@users.noreply.github.com>
---
 .../scaladsl/EventSourcedStashOverflowSpec.scala   |   8 +-
 .../DurableStateBehaviorStashOverflowSpec.scala    | 133 +++++++++++++++++++++
 .../eventsourcedbehavior-stackoverflow.excludes    |  22 ++++
 .../src/main/resources/reference.conf              |   7 ++
 .../typed/internal/EventSourcedSettings.scala      |   9 +-
 .../typed/internal/ExternalInteractions.scala      |  10 +-
 .../typed/internal/ReplayingEvents.scala           |   2 +-
 .../pekko/persistence/typed/internal/Running.scala | 124 ++++++++++++++-----
 .../state/internal/DurableStateSettings.scala      |   9 +-
 .../internal/DurableStateStoreInteractions.scala   |   8 +-
 .../typed/state/internal/Recovering.scala          |   2 +-
 .../persistence/typed/state/internal/Running.scala | 104 +++++++++++++---
 .../typed/internal/StashStateSpec.scala            |   3 +-
 13 files changed, 375 insertions(+), 66 deletions(-)

diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala
index 316015ca75..34f15089e6 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala
@@ -55,10 +55,12 @@ object EventSourcedStashOverflowSpec {
     
SteppingInmemJournal.config("EventSourcedStashOverflow").withFallback(ConfigFactory.parseString(s"""
        pekko.persistence {
          typed {
-           stash-capacity = 1000 # enough to fail on stack size
+           stash-capacity = 20000 # enough to fail on stack size
            stash-overflow-strategy = "drop"
+           recurse-when-unstashing-read-only-commands = false
          }
        }
+       pekko.jvm-exit-on-fatal-error = off
    """))
 }
 
@@ -85,8 +87,8 @@ class EventSourcedStashOverflowSpec
       for (_ <- 0 to (stashCapacity * 2)) {
         es.tell(EventSourcedStringList.DoNothing(probe.ref))
       }
-      // capacity + 1 should mean that we get a dropped last message when all 
stash is filled
-      // while the actor is stuck in replay because journal isn't responding
+      // capacity * 2 should mean that we get many dropped messages when all 
stash is filled
+      // while the actor is stuck in replay because journal isn't responding 
(checking only one)
       droppedMessageProbe.receiveMessage()
       implicit val classicSystem: pekko.actor.ActorSystem =
         testKit.system.toClassic
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
new file mode 100644
index 0000000000..ada975be5b
--- /dev/null
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.typed.state.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.persistence.state.DurableStateStoreProvider
+import pekko.persistence.state.scaladsl.{ DurableStateStore, 
DurableStateUpdateStore, GetObjectResult }
+import pekko.persistence.state.javadsl.{ DurableStateStore => 
JDurableStateStore }
+import pekko.persistence.typed.PersistenceId
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.concurrent.{ Future, Promise }
+import scala.concurrent.duration._
+
+object DurableStateBehaviorStashOverflowSpec {
+
+  class TestDurableStateStoreProvider extends DurableStateStoreProvider {
+    private val store = new TestDurableStateStorePlugin[Any]
+
+    override def scaladslDurableStateStore(): DurableStateStore[Any] = store
+
+    // Not used here
+    override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = null
+  }
+
+  object TestDurableStateStorePlugin {
+    @volatile var instance: Option[TestDurableStateStorePlugin[_]] = None
+    def getInstance(): TestDurableStateStorePlugin[_] = instance.get
+  }
+
+  class TestDurableStateStorePlugin[A] extends DurableStateUpdateStore[A] {
+    TestDurableStateStorePlugin.instance = Some(this)
+
+    private val promise: Promise[Done] = Promise()
+
+    override def getObject(persistenceId: String): Future[GetObjectResult[A]] =
+      Future.successful(GetObjectResult[A](None, 0L))
+
+    override def upsertObject(persistenceId: String, revision: Long, value: A, 
tag: String): Future[Done] =
+      promise.future
+
+    def completeUpsertFuture(): Unit = promise.success(Done)
+
+    override def deleteObject(persistenceId: String): Future[Done] = 
Future.successful(Done)
+
+    override def deleteObject(persistenceId: String, revision: Long): 
Future[Done] = Future.successful(Done)
+  }
+
+  object DurableStateString {
+    sealed trait Command
+    case class DoNothing(replyTo: ActorRef[Done]) extends Command
+    case class DoPersist(replyTo: ActorRef[Done]) extends Command
+
+    def apply(persistenceId: PersistenceId): Behavior[Command] =
+      DurableStateBehavior[Command, String](
+        persistenceId,
+        "",
+        { (_, command) =>
+          command match {
+            case DoNothing(replyTo) =>
+              Effect.reply(replyTo)(Done)
+
+            case DoPersist(replyTo) =>
+              Effect.persist("Initial persist").thenRun(_ => replyTo ! Done)
+          }
+        })
+  }
+
+  def conf = ConfigFactory.parseString(s"""
+       pekko.persistence {
+         state.plugin = "my-state-plugin"
+         typed {
+           stash-capacity = 20000 # enough to fail on stack size
+           stash-overflow-strategy = "drop"
+           recurse-when-unstashing-read-only-commands = false
+         }
+       }
+       pekko.jvm-exit-on-fatal-error = off
+       my-state-plugin.class = 
"${classOf[TestDurableStateStoreProvider].getName}"
+   """)
+}
+
+class DurableStateBehaviorStashOverflowSpec
+    extends 
ScalaTestWithActorTestKit(DurableStateBehaviorStashOverflowSpec.conf)
+    with AnyWordSpecLike
+    with LogCapturing {
+
+  import DurableStateBehaviorStashOverflowSpec._
+
+  "Stashing in a busy durable state behavior" must {
+
+    "not cause stack overflow" in {
+      val es = spawn(DurableStateString(PersistenceId.ofUniqueId("id-1")))
+
+      // wait for journal to start
+      val probe = testKit.createTestProbe[Done]()
+      val journal = 
probe.awaitAssert(TestDurableStateStorePlugin.getInstance(), 3.seconds)
+
+      val droppedMessageProbe = testKit.createDroppedMessageProbe()
+      val stashCapacity = 
testKit.config.getInt("pekko.persistence.typed.stash-capacity")
+
+      es.tell(DurableStateString.DoPersist(probe.ref))
+
+      for (_ <- 0 to (stashCapacity * 2)) {
+        es.tell(DurableStateString.DoNothing(probe.ref))
+      }
+      // capacity * 2 should mean that we get many dropped messages when all 
stash is filled
+      // while the actor is stuck in replay because journal isn't responding 
(checking only one)
+      droppedMessageProbe.receiveMessage()
+      journal.completeUpsertFuture()
+
+      // exactly how many is racy but at least the first stash buffer full 
should complete
+      probe.receiveMessages(stashCapacity)
+    }
+  }
+}
diff --git 
a/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes
 
b/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes
new file mode 100644
index 0000000000..10838cf792
--- /dev/null
+++ 
b/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# EventSourcedBehavior/DurableState Read-Only Events Stackoverflow
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.typed.internal.Running#*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.internal.Running#*")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.typed.state.internal.Running#*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.state.internal.Running#*")
diff --git a/persistence-typed/src/main/resources/reference.conf 
b/persistence-typed/src/main/resources/reference.conf
index 1d0b7c4237..2513dac159 100644
--- a/persistence-typed/src/main/resources/reference.conf
+++ b/persistence-typed/src/main/resources/reference.conf
@@ -47,6 +47,13 @@ pekko.persistence.typed {
   # this can be changed by setting this to 'true' in which case the internal 
logging is sent to
   # the actor context logger.
   use-context-logger-for-internal-logging = false
+
+  # Commands get stashed during persistence and un-stashed afterward. 
Normally, when many read-only
+  # (i.e. that do not cause any persistence) commands are present in the 
stash, the functions are called
+  # in a loop to unstash all of them. Setting this to true, will cause 
functions to be called
+  # recursively (that was the default before this setting was introduced). 
That might cause a stack
+  # overflow in case there are many messages to unstash.
+  recurse-when-unstashing-read-only-commands = false
 }
 
 pekko.reliable-delivery {
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
index 8992743640..6ab4dcf661 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
@@ -63,6 +63,9 @@ import pekko.annotation.InternalApi
 
     val useContextLoggerForInternalLogging = 
typedConfig.getBoolean("use-context-logger-for-internal-logging")
 
+    val recurseWhenUnstashingReadOnlyCommands =
+      typedConfig.getBoolean("recurse-when-unstashing-read-only-commands")
+
     EventSourcedSettings(
       stashCapacity = stashCapacity,
       stashOverflowStrategy,
@@ -71,7 +74,8 @@ import pekko.annotation.InternalApi
       snapshotPluginId,
       journalPluginConfig,
       snapshotPluginConfig,
-      useContextLoggerForInternalLogging)
+      useContextLoggerForInternalLogging,
+      recurseWhenUnstashingReadOnlyCommands)
   }
 }
 
@@ -87,7 +91,8 @@ private[pekko] final case class EventSourcedSettings(
     snapshotPluginId: String,
     journalPluginConfig: Option[Config],
     snapshotPluginConfig: Option[Config],
-    useContextLoggerForInternalLogging: Boolean) {
+    useContextLoggerForInternalLogging: Boolean,
+    recurseWhenUnstashingReadOnlyCommands: Boolean) {
 
   require(journalPluginId != null, "journal plugin id must not be null; use 
empty string for 'default' journal")
   require(
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
index 21747792d2..304e3a0d2b 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
@@ -54,10 +54,10 @@ private[pekko] trait JournalInteractions[C, E, S] {
   protected def internalPersist(
       ctx: ActorContext[_],
       cmd: Any,
-      state: Running.RunningState[S],
+      state: Running.RunningState[S, C],
       event: EventOrTaggedOrReplicated,
       eventAdapterManifest: String,
-      metadata: OptionVal[Any]): Running.RunningState[S] = {
+      metadata: OptionVal[Any]): Running.RunningState[S, C] = {
 
     val newRunningState = state.nextSequenceNr()
 
@@ -93,8 +93,8 @@ private[pekko] trait JournalInteractions[C, E, S] {
   protected def internalPersistAll(
       ctx: ActorContext[_],
       cmd: Any,
-      state: Running.RunningState[S],
-      events: immutable.Seq[EventToPersist]): Running.RunningState[S] = {
+      state: Running.RunningState[S, C],
+      events: immutable.Seq[EventToPersist]): Running.RunningState[S, C] = {
     if (events.nonEmpty) {
       var newState = state
 
@@ -202,7 +202,7 @@ private[pekko] trait SnapshotInteractions[C, E, S] {
     setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId.id, criteria, 
toSequenceNr), setup.selfClassic)
   }
 
-  protected def internalSaveSnapshot(state: Running.RunningState[S]): Unit = {
+  protected def internalSaveSnapshot(state: Running.RunningState[S, C]): Unit 
= {
     setup.internalLogger.debug("Saving snapshot sequenceNr [{}]", state.seqNr)
     if (state.state == null)
       throw new IllegalStateException("A snapshot must not be a null state.")
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
index 4e3a3c3a50..8407bb4f5c 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
@@ -294,7 +294,7 @@ private[pekko] final class ReplayingEvents[C, E, S](
       if (state.receivedPoisonPill && isInternalStashEmpty && 
!isUnstashAllInProgress)
         Behaviors.stopped
       else {
-        val runningState = Running.RunningState[S](
+        val runningState = Running.RunningState[S, C](
           seqNr = state.seqNr,
           state = state.state,
           receivedPoisonPill = state.receivedPoisonPill,
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index f4bf501209..bae282c033 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -97,21 +97,35 @@ private[pekko] object Running {
     def currentSequenceNumber: Long
   }
 
-  final case class RunningState[State](
+  // This is part of the fix for https://github.com/apache/pekko/issues/1327 
and it's necessary to break
+  // recursion between the `onCommand` and `onMessage` functions in Running[C, 
E, S]#HandlingCommands. We
+  // set the flag `inOnCommandCall` to true in `onCommand` before calling 
functions that will in turn call
+  // `onMessage`. In `onMessage`, if the flag is set and we would recursively 
call `onCommand`, we instead
+  // save the parameters we would use to call `onCommand` and return. When 
back in `onCommand`, we check if
+  // `recOnCommandParams` is not empty and, if not, act as if `onMessage` 
called us directly. In function
+  // `onCommand` we have a while loop replacing recursive calls so that we 
don't use all the stack space in
+  // case a lot of read-only message are stashed
+  final class UnstashRecurrenceState[State, Command] {
+    var inOnCommandCall: Boolean = false
+    var recOnCommandParams: Option[(RunningState[State, Command], Command)] = 
None
+  }
+
+  final case class RunningState[State, Command](
       seqNr: Long,
       state: State,
       receivedPoisonPill: Boolean,
       version: VersionVector,
       seenPerReplica: Map[ReplicaId, Long],
-      replicationControl: Map[ReplicaId, ReplicationStreamControl]) {
+      replicationControl: Map[ReplicaId, ReplicationStreamControl],
+      unstashRecurrenceState: UnstashRecurrenceState[State, Command] = new 
UnstashRecurrenceState[State, Command]) {
 
-    def nextSequenceNr(): RunningState[State] =
+    def nextSequenceNr(): RunningState[State, Command] =
       copy(seqNr = seqNr + 1)
 
-    def updateLastSequenceNr(persistent: PersistentRepr): RunningState[State] =
+    def updateLastSequenceNr(persistent: PersistentRepr): RunningState[State, 
Command] =
       if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) 
else this
 
-    def applyEvent[C, E](setup: BehaviorSetup[C, E, State], event: E): 
RunningState[State] = {
+    def applyEvent[E](setup: BehaviorSetup[Command, E, State], event: E): 
RunningState[State, Command] = {
       val updated = setup.eventHandler(state, event)
       copy(state = updated)
     }
@@ -119,8 +133,8 @@ private[pekko] object Running {
 
   def startReplicationStream[C, E, S](
       setup: BehaviorSetup[C, E, S],
-      state: RunningState[S],
-      replicationSetup: ReplicationSetup): RunningState[S] = {
+      state: RunningState[S, C],
+      replicationSetup: ReplicationSetup): RunningState[S, C] = {
     import scala.concurrent.duration._
     val system = setup.context.system
     val ref = setup.context.self
@@ -238,7 +252,7 @@ private[pekko] object Running {
   // Needed for WithSeqNrAccessible, when unstashing
   private var _currentSequenceNumber = 0L
 
-  final class HandlingCommands(state: RunningState[S])
+  final class HandlingCommands(state: RunningState[S, C])
       extends AbstractBehavior[InternalProtocol](setup.context)
       with WithSeqNrAccessible {
 
@@ -249,7 +263,15 @@ private[pekko] object Running {
     }
 
     def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg 
match {
-      case IncomingCommand(c: C @unchecked)          => onCommand(state, c)
+      case IncomingCommand(c: C @unchecked) if 
setup.settings.recurseWhenUnstashingReadOnlyCommands =>
+        onCommand(state, c)
+      case IncomingCommand(c: C @unchecked) =>
+        if (state.unstashRecurrenceState.inOnCommandCall) {
+          state.unstashRecurrenceState.recOnCommandParams = Some((state, c))
+          this // This will be ignored in onCommand
+        } else {
+          onCommand(state, c)
+        }
       case re: ReplicatedEventEnvelope[E @unchecked] => 
onReplicatedEvent(state, re, setup.replication.get)
       case pe: PublishedEventImpl                    => 
onPublishedEvent(state, pe)
       case JournalResponse(r)                        => 
onDeleteEventsJournalResponse(r, state.state)
@@ -268,15 +290,60 @@ private[pekko] object Running {
         else Behaviors.unhandled
     }
 
-    def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] 
= {
-      val effect = setup.commandHandler(state.state, cmd)
-      val (next, doUnstash) = applyEffects(cmd, state, 
effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
-      if (doUnstash) tryUnstashOne(next)
-      else next
+    def onCommand(state: RunningState[S, C], cmd: C): 
Behavior[InternalProtocol] = {
+      def callApplyEffects(rs: RunningState[S, C], c: C): 
(Behavior[InternalProtocol], Boolean) = {
+        val effect = setup.commandHandler(rs.state, c)
+
+        applyEffects(c, rs, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can 
we avoid the cast?
+      }
+
+      def recursiveUnstashImpl(
+          applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
+      ): Behavior[InternalProtocol] = {
+        val (next, doUnstash) = applyEffectsRetval
+
+        if (doUnstash) tryUnstashOne(next)
+        else next
+      }
+
+      def nonRecursiveUnstashImpl(
+          applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
+      ): Behavior[InternalProtocol] = {
+        @tailrec
+        def loop(applyEffectsRetval: (Behavior[InternalProtocol], Boolean)): 
Behavior[InternalProtocol] = {
+          val (next, doUnstash) = applyEffectsRetval
+
+          if (doUnstash) {
+            state.unstashRecurrenceState.inOnCommandCall = true
+            val r = tryUnstashOne(next)
+            state.unstashRecurrenceState.inOnCommandCall = false
+
+            val recOnCommandParams = 
state.unstashRecurrenceState.recOnCommandParams
+            state.unstashRecurrenceState.recOnCommandParams = None
+
+            recOnCommandParams match {
+              case None          => r
+              case Some((rs, c)) => loop(callApplyEffects(rs, c))
+            }
+          } else {
+            next
+          }
+        }
+
+        loop(applyEffectsRetval)
+      }
+
+      val r = callApplyEffects(state, cmd)
+
+      if (setup.settings.recurseWhenUnstashingReadOnlyCommands) {
+        recursiveUnstashImpl(r)
+      } else {
+        nonRecursiveUnstashImpl(r)
+      }
     }
 
     def onReplicatedEvent(
-        state: Running.RunningState[S],
+        state: Running.RunningState[S, C],
         envelope: ReplicatedEventEnvelope[E],
         replication: ReplicationSetup): Behavior[InternalProtocol] = {
       setup.internalLogger.debugN(
@@ -300,7 +367,7 @@ private[pekko] object Running {
       }
     }
 
-    def onPublishedEvent(state: Running.RunningState[S], event: 
PublishedEventImpl): Behavior[InternalProtocol] = {
+    def onPublishedEvent(state: Running.RunningState[S, C], event: 
PublishedEventImpl): Behavior[InternalProtocol] = {
       val newBehavior: Behavior[InternalProtocol] = setup.replication match {
         case None =>
           setup.internalLogger.warn(
@@ -321,7 +388,7 @@ private[pekko] object Running {
     }
 
     private def onPublishedEvent(
-        state: Running.RunningState[S],
+        state: Running.RunningState[S, C],
         replication: ReplicationSetup,
         replicatedMetadata: ReplicatedPublishedEventMetaData,
         event: PublishedEventImpl): Behavior[InternalProtocol] = {
@@ -428,7 +495,7 @@ private[pekko] object Running {
 
       replication.clearContext()
 
-      val newState2: RunningState[S] = internalPersist(
+      val newState2: RunningState[S, C] = internalPersist(
         setup.context,
         null,
         stateAfterApply,
@@ -464,10 +531,10 @@ private[pekko] object Running {
         val eventToPersist = adaptEvent(event)
         val eventAdapterManifest = setup.eventAdapter.manifest(event)
 
-        val newState2 = setup.replication match {
+        val newState2: RunningState[S, C] = setup.replication match {
           case Some(replication) =>
             val updatedVersion = 
stateAfterApply.version.updated(replication.replicaId.id, 
_currentSequenceNumber)
-            val r = internalPersist(
+            val r: RunningState[S, C] = internalPersist(
               setup.context,
               cmd,
               stateAfterApply,
@@ -572,9 +639,10 @@ private[pekko] object Running {
         (applySideEffects(sideEffects, state), true)
       }
     }
+
     @tailrec def applyEffects(
         msg: Any,
-        state: RunningState[S],
+        state: RunningState[S, C],
         effect: Effect[E, S],
         sideEffects: immutable.Seq[SideEffect[S]] = Nil): 
(Behavior[InternalProtocol], Boolean) = {
       if (setup.internalLogger.isDebugEnabled && 
!effect.isInstanceOf[CompositeEffect[_, _]])
@@ -630,8 +698,8 @@ private[pekko] object Running {
   // ===============================================
 
   def persistingEvents(
-      state: RunningState[S],
-      visibleState: RunningState[S], // previous state until write success
+      state: RunningState[S, C],
+      visibleState: RunningState[S, C], // previous state until write success
       numberOfEvents: Int,
       shouldSnapshotAfterPersist: SnapshotAfterPersist,
       shouldPublish: Boolean,
@@ -642,8 +710,8 @@ private[pekko] object Running {
 
   /** INTERNAL API */
   @InternalApi private[pekko] class PersistingEvents(
-      var state: RunningState[S],
-      var visibleState: RunningState[S], // previous state until write success
+      var state: RunningState[S, C],
+      var visibleState: RunningState[S, C], // previous state until write 
success
       numberOfEvents: Int,
       shouldSnapshotAfterPersist: SnapshotAfterPersist,
       shouldPublish: Boolean,
@@ -789,7 +857,7 @@ private[pekko] object Running {
 
   /** INTERNAL API */
   @InternalApi private[pekko] class StoringSnapshot(
-      state: RunningState[S],
+      state: RunningState[S, C],
       sideEffects: immutable.Seq[SideEffect[S]],
       snapshotReason: SnapshotAfterPersist)
       extends AbstractBehavior[InternalProtocol](setup.context)
@@ -886,7 +954,7 @@ private[pekko] object Running {
 
   // --------------------------
 
-  def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: 
RunningState[S]): Behavior[InternalProtocol] = {
+  def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: 
RunningState[S, C]): Behavior[InternalProtocol] = {
     var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
     val it = effects.iterator
 
@@ -905,7 +973,7 @@ private[pekko] object Running {
 
   def applySideEffect(
       effect: SideEffect[S],
-      state: RunningState[S],
+      state: RunningState[S, C],
       behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
     effect match {
       case _: Stop.type @unchecked =>
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
index adaf59e0a6..4d1d472794 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
@@ -54,13 +54,17 @@ import pekko.persistence.Persistence
 
     val useContextLoggerForInternalLogging = 
typedConfig.getBoolean("use-context-logger-for-internal-logging")
 
+    val recurseWhenUnstashingReadOnlyCommands =
+      typedConfig.getBoolean("recurse-when-unstashing-read-only-commands")
+
     DurableStateSettings(
       stashCapacity = stashCapacity,
       stashOverflowStrategy,
       logOnStashing = logOnStashing,
       recoveryTimeout,
       durableStateStorePluginId,
-      useContextLoggerForInternalLogging)
+      useContextLoggerForInternalLogging,
+      recurseWhenUnstashingReadOnlyCommands)
   }
 
   private def durableStateStoreConfigFor(config: Config, pluginId: String): 
Config = {
@@ -87,7 +91,8 @@ private[pekko] final case class DurableStateSettings(
     logOnStashing: Boolean,
     recoveryTimeout: FiniteDuration,
     durableStateStorePluginId: String,
-    useContextLoggerForInternalLogging: Boolean) {
+    useContextLoggerForInternalLogging: Boolean,
+    recurseWhenUnstashingReadOnlyCommands: Boolean) {
 
   require(
     durableStateStorePluginId != null,
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
index a2505e5fae..0063e88fc2 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
@@ -49,8 +49,8 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
   protected def internalUpsert(
       ctx: ActorContext[InternalProtocol],
       cmd: Any,
-      state: Running.RunningState[S],
-      value: Any): Running.RunningState[S] = {
+      state: Running.RunningState[S, C],
+      value: Any): Running.RunningState[S, C] = {
 
     val newRunningState = state.nextRevision()
     val persistenceId = setup.persistenceId.id
@@ -69,9 +69,9 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
   protected def internalDelete(
       ctx: ActorContext[InternalProtocol],
       cmd: Any,
-      state: Running.RunningState[S]): Running.RunningState[S] = {
+      state: Running.RunningState[S, C]): Running.RunningState[S, C] = {
 
-    val newRunningState = state.nextRevision().copy(state = setup.emptyState)
+    val newRunningState: Running.RunningState[S, C] = 
state.nextRevision().copy(state = setup.emptyState)
     val persistenceId = setup.persistenceId.id
 
     onDeleteInitiated(ctx, cmd)
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala
index 88926675cb..1431a0c1da 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala
@@ -183,7 +183,7 @@ private[pekko] class Recovering[C, S](
       if (state.receivedPoisonPill && isInternalStashEmpty && 
!isUnstashAllInProgress)
         Behaviors.stopped
       else {
-        val runningState = Running.RunningState[S](
+        val runningState = Running.RunningState[S, C](
           revision = state.revision,
           state = state.state,
           receivedPoisonPill = state.receivedPoisonPill)
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
index d4a0a3fcba..549946e851 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
@@ -55,12 +55,25 @@ private[pekko] object Running {
     def currentRevision: Long
   }
 
-  final case class RunningState[State](revision: Long, state: State, 
receivedPoisonPill: Boolean) {
+  // This is part of the fix for https://github.com/apache/pekko/issues/1327 
and it's necessary to break
+  // recursion between the `onCommand` and `onMessage` functions in Running[C, 
E, S]#HandlingCommands.
+  // See comment in 
org.apache.pekko.persistence.typed.internal.Running#UnstashRecurrenceState for 
more
+  // information
+  final class UnstashRecurrenceState[State, Command] {
+    var inOnCommandCall: Boolean = false
+    var recOnCommandParams: Option[(RunningState[State, Command], Command)] = 
None
+  }
+
+  final case class RunningState[State, Command](
+      revision: Long,
+      state: State,
+      receivedPoisonPill: Boolean,
+      unstashRecurrenceState: UnstashRecurrenceState[State, Command] = new 
UnstashRecurrenceState[State, Command]) {
 
-    def nextRevision(): RunningState[State] =
+    def nextRevision(): RunningState[State, Command] =
       copy(revision = revision + 1)
 
-    def applyState[C, E](@unused setup: BehaviorSetup[C, State], updated: 
State): RunningState[State] = {
+    def applyState(@unused setup: BehaviorSetup[Command, State], updated: 
State): RunningState[State, Command] = {
       copy(state = updated)
     }
   }
@@ -79,16 +92,24 @@ private[pekko] object Running {
   // Needed for WithSeqNrAccessible, when unstashing
   private var _currentRevision = 0L
 
-  final class HandlingCommands(state: RunningState[S])
+  final class HandlingCommands(state: RunningState[S, C])
       extends AbstractBehavior[InternalProtocol](setup.context)
       with WithRevisionAccessible {
 
     _currentRevision = state.revision
 
     def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg 
match {
-      case IncomingCommand(c: C @unchecked) => onCommand(state, c)
-      case get: GetState[S @unchecked]      => onGetState(get)
-      case _                                => Behaviors.unhandled
+      case IncomingCommand(c: C @unchecked) if 
setup.settings.recurseWhenUnstashingReadOnlyCommands =>
+        onCommand(state, c)
+      case IncomingCommand(c: C @unchecked) =>
+        if (state.unstashRecurrenceState.inOnCommandCall) {
+          state.unstashRecurrenceState.recOnCommandParams = Some((state, c))
+          this // This will be ignored in onCommand
+        } else {
+          onCommand(state, c)
+        }
+      case get: GetState[S @unchecked] => onGetState(get)
+      case _                           => Behaviors.unhandled
     }
 
     override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] 
= {
@@ -100,11 +121,56 @@ private[pekko] object Running {
         else Behaviors.unhandled
     }
 
-    def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] 
= {
-      val effect = setup.commandHandler(state.state, cmd)
-      val (next, doUnstash) = applyEffects(cmd, state, 
effect.asInstanceOf[EffectImpl[S]]) // TODO can we avoid the cast?
-      if (doUnstash) tryUnstashOne(next)
-      else next
+    def onCommand(state: RunningState[S, C], cmd: C): 
Behavior[InternalProtocol] = {
+      def callApplyEffects(rs: RunningState[S, C], c: C): 
(Behavior[InternalProtocol], Boolean) = {
+        val effect = setup.commandHandler(rs.state, c)
+
+        applyEffects(c, rs, effect.asInstanceOf[EffectImpl[S]]) // TODO can we 
avoid the cast?
+      }
+
+      def recursiveUnstashImpl(
+          applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
+      ): Behavior[InternalProtocol] = {
+        val (next, doUnstash) = applyEffectsRetval
+
+        if (doUnstash) tryUnstashOne(next)
+        else next
+      }
+
+      def nonRecursiveUnstashImpl(
+          applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
+      ): Behavior[InternalProtocol] = {
+        @tailrec
+        def loop(applyEffectsRetval: (Behavior[InternalProtocol], Boolean)): 
Behavior[InternalProtocol] = {
+          val (next, doUnstash) = applyEffectsRetval
+
+          if (doUnstash) {
+            state.unstashRecurrenceState.inOnCommandCall = true
+            val r = tryUnstashOne(next)
+            state.unstashRecurrenceState.inOnCommandCall = false
+
+            val recOnCommandParams = 
state.unstashRecurrenceState.recOnCommandParams
+            state.unstashRecurrenceState.recOnCommandParams = None
+
+            recOnCommandParams match {
+              case None          => r
+              case Some((rs, c)) => loop(callApplyEffects(rs, c))
+            }
+          } else {
+            next
+          }
+        }
+
+        loop(applyEffectsRetval)
+      }
+
+      val r = callApplyEffects(state, cmd)
+
+      if (setup.settings.recurseWhenUnstashingReadOnlyCommands) {
+        recursiveUnstashImpl(r)
+      } else {
+        nonRecursiveUnstashImpl(r)
+      }
     }
 
     // Used by DurableStateBehaviorTestKit to retrieve the state.
@@ -130,7 +196,7 @@ private[pekko] object Running {
 
     @tailrec def applyEffects(
         msg: Any,
-        state: RunningState[S],
+        state: RunningState[S, C],
         effect: Effect[S],
         sideEffects: immutable.Seq[SideEffect[S]] = Nil): 
(Behavior[InternalProtocol], Boolean) = {
       if (setup.internalLogger.isDebugEnabled && 
!effect.isInstanceOf[CompositeEffect[_]])
@@ -182,8 +248,8 @@ private[pekko] object Running {
   // ===============================================
 
   def persistingState(
-      state: RunningState[S],
-      visibleState: RunningState[S], // previous state until write success
+      state: RunningState[S, C],
+      visibleState: RunningState[S, C], // previous state until write success
       sideEffects: immutable.Seq[SideEffect[S]]): Behavior[InternalProtocol] = 
{
     setup.setMdcPhase(PersistenceMdc.PersistingState)
     new PersistingState(state, visibleState, sideEffects)
@@ -191,8 +257,8 @@ private[pekko] object Running {
 
   /** INTERNAL API */
   @InternalApi private[pekko] class PersistingState(
-      var state: RunningState[S],
-      var visibleState: RunningState[S], // previous state until write success
+      var state: RunningState[S, C],
+      var visibleState: RunningState[S, C], // previous state until write 
success
       var sideEffects: immutable.Seq[SideEffect[S]],
       persistStartTime: Long = System.nanoTime())
       extends AbstractBehavior[InternalProtocol](setup.context)
@@ -258,7 +324,7 @@ private[pekko] object Running {
 
   // ===============================================
 
-  def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: 
RunningState[S]): Behavior[InternalProtocol] = {
+  def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: 
RunningState[S, C]): Behavior[InternalProtocol] = {
     var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
     val it = effects.iterator
 
@@ -277,7 +343,7 @@ private[pekko] object Running {
 
   def applySideEffect(
       effect: SideEffect[S],
-      state: RunningState[S],
+      state: RunningState[S, C],
       behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
     effect match {
       case _: Stop.type @unchecked =>
diff --git 
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala
 
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala
index b7180fb78d..80b710a8dd 100644
--- 
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala
+++ 
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala
@@ -78,6 +78,7 @@ class StashStateSpec extends ScalaTestWithActorTestKit with 
AnyWordSpecLike with
       snapshotPluginId = "",
       journalPluginConfig = None,
       snapshotPluginConfig = None,
-      useContextLoggerForInternalLogging = false)
+      useContextLoggerForInternalLogging = false,
+      recurseWhenUnstashingReadOnlyCommands = false)
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pekko.apache.org
For additional commands, e-mail: commits-h...@pekko.apache.org


Reply via email to