This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new d89f2eec8b Fix typed persistence stack overflow with many read only
commands (#1919)
d89f2eec8b is described below
commit d89f2eec8b1cf778869a0e19bf7c77a8e0720565
Author: Tomassino-ibm <[email protected]>
AuthorDate: Fri Jul 11 11:32:43 2025 +0200
Fix typed persistence stack overflow with many read only commands (#1919)
* Update unstash stack overflow test to have it actually fail
Taken changes from https://github.com/apache/pekko/pull/1336 to have a test
that fails
* Fix possible stack overflow in persistence-typed
This commit adds code to break recurrent calls in persistence-typed while
unstashing read-only commands that could lead to a stack overflow, fixing issue
#1327 (limited to EventSourcedBehavior)
The fix can be enabled using a feature flag, by default it is disabled
* bin compat exclude and scalafmt
* Also fix the same stack overflow issue in DurableStateBehavior
The fix is enabled by the same feature flag used by the fix of
EventSourcedBehavior
* Enable by default the fix for the stack overflow
Also rename parameter
* Refactor code to make it more explicit that the old code path is unchanged
This commit changes how `onMessage` and `onCommand` are implemented to make
it clearer that, when the `recurse-when-unstashing-read-only-commands` flag is
set to true, the old code path is used.
Moreover, the while loop in onCommand has been changed into a tail
recursive function
These changes have been applied to both EventSourcedBehavior and
DurableStateBehavior
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.../scaladsl/EventSourcedStashOverflowSpec.scala | 8 +-
.../DurableStateBehaviorStashOverflowSpec.scala | 133 +++++++++++++++++++++
.../eventsourcedbehavior-stackoverflow.excludes | 22 ++++
.../src/main/resources/reference.conf | 7 ++
.../typed/internal/EventSourcedSettings.scala | 9 +-
.../typed/internal/ExternalInteractions.scala | 10 +-
.../typed/internal/ReplayingEvents.scala | 2 +-
.../pekko/persistence/typed/internal/Running.scala | 124 ++++++++++++++-----
.../state/internal/DurableStateSettings.scala | 9 +-
.../internal/DurableStateStoreInteractions.scala | 8 +-
.../typed/state/internal/Recovering.scala | 2 +-
.../persistence/typed/state/internal/Running.scala | 104 +++++++++++++---
.../typed/internal/StashStateSpec.scala | 3 +-
13 files changed, 375 insertions(+), 66 deletions(-)
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala
index 316015ca75..34f15089e6 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala
@@ -55,10 +55,12 @@ object EventSourcedStashOverflowSpec {
SteppingInmemJournal.config("EventSourcedStashOverflow").withFallback(ConfigFactory.parseString(s"""
pekko.persistence {
typed {
- stash-capacity = 1000 # enough to fail on stack size
+ stash-capacity = 20000 # enough to fail on stack size
stash-overflow-strategy = "drop"
+ recurse-when-unstashing-read-only-commands = false
}
}
+ pekko.jvm-exit-on-fatal-error = off
"""))
}
@@ -85,8 +87,8 @@ class EventSourcedStashOverflowSpec
for (_ <- 0 to (stashCapacity * 2)) {
es.tell(EventSourcedStringList.DoNothing(probe.ref))
}
- // capacity + 1 should mean that we get a dropped last message when all
stash is filled
- // while the actor is stuck in replay because journal isn't responding
+ // capacity * 2 should mean that we get many dropped messages when all
stash is filled
+ // while the actor is stuck in replay because journal isn't responding
(checking only one)
droppedMessageProbe.receiveMessage()
implicit val classicSystem: pekko.actor.ActorSystem =
testKit.system.toClassic
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
new file mode 100644
index 0000000000..ada975be5b
--- /dev/null
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.typed.state.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.persistence.state.DurableStateStoreProvider
+import pekko.persistence.state.scaladsl.{ DurableStateStore,
DurableStateUpdateStore, GetObjectResult }
+import pekko.persistence.state.javadsl.{ DurableStateStore =>
JDurableStateStore }
+import pekko.persistence.typed.PersistenceId
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.concurrent.{ Future, Promise }
+import scala.concurrent.duration._
+
+object DurableStateBehaviorStashOverflowSpec {
+
+ class TestDurableStateStoreProvider extends DurableStateStoreProvider {
+ private val store = new TestDurableStateStorePlugin[Any]
+
+ override def scaladslDurableStateStore(): DurableStateStore[Any] = store
+
+ // Not used here
+ override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = null
+ }
+
+ object TestDurableStateStorePlugin {
+ @volatile var instance: Option[TestDurableStateStorePlugin[_]] = None
+ def getInstance(): TestDurableStateStorePlugin[_] = instance.get
+ }
+
+ class TestDurableStateStorePlugin[A] extends DurableStateUpdateStore[A] {
+ TestDurableStateStorePlugin.instance = Some(this)
+
+ private val promise: Promise[Done] = Promise()
+
+ override def getObject(persistenceId: String): Future[GetObjectResult[A]] =
+ Future.successful(GetObjectResult[A](None, 0L))
+
+ override def upsertObject(persistenceId: String, revision: Long, value: A,
tag: String): Future[Done] =
+ promise.future
+
+ def completeUpsertFuture(): Unit = promise.success(Done)
+
+ override def deleteObject(persistenceId: String): Future[Done] =
Future.successful(Done)
+
+ override def deleteObject(persistenceId: String, revision: Long):
Future[Done] = Future.successful(Done)
+ }
+
+ object DurableStateString {
+ sealed trait Command
+ case class DoNothing(replyTo: ActorRef[Done]) extends Command
+ case class DoPersist(replyTo: ActorRef[Done]) extends Command
+
+ def apply(persistenceId: PersistenceId): Behavior[Command] =
+ DurableStateBehavior[Command, String](
+ persistenceId,
+ "",
+ { (_, command) =>
+ command match {
+ case DoNothing(replyTo) =>
+ Effect.reply(replyTo)(Done)
+
+ case DoPersist(replyTo) =>
+ Effect.persist("Initial persist").thenRun(_ => replyTo ! Done)
+ }
+ })
+ }
+
+ def conf = ConfigFactory.parseString(s"""
+ pekko.persistence {
+ state.plugin = "my-state-plugin"
+ typed {
+ stash-capacity = 20000 # enough to fail on stack size
+ stash-overflow-strategy = "drop"
+ recurse-when-unstashing-read-only-commands = false
+ }
+ }
+ pekko.jvm-exit-on-fatal-error = off
+ my-state-plugin.class =
"${classOf[TestDurableStateStoreProvider].getName}"
+ """)
+}
+
+class DurableStateBehaviorStashOverflowSpec
+ extends
ScalaTestWithActorTestKit(DurableStateBehaviorStashOverflowSpec.conf)
+ with AnyWordSpecLike
+ with LogCapturing {
+
+ import DurableStateBehaviorStashOverflowSpec._
+
+ "Stashing in a busy durable state behavior" must {
+
+ "not cause stack overflow" in {
+ val es = spawn(DurableStateString(PersistenceId.ofUniqueId("id-1")))
+
+ // wait for journal to start
+ val probe = testKit.createTestProbe[Done]()
+ val journal =
probe.awaitAssert(TestDurableStateStorePlugin.getInstance(), 3.seconds)
+
+ val droppedMessageProbe = testKit.createDroppedMessageProbe()
+ val stashCapacity =
testKit.config.getInt("pekko.persistence.typed.stash-capacity")
+
+ es.tell(DurableStateString.DoPersist(probe.ref))
+
+ for (_ <- 0 to (stashCapacity * 2)) {
+ es.tell(DurableStateString.DoNothing(probe.ref))
+ }
+ // capacity * 2 should mean that we get many dropped messages when all
stash is filled
+ // while the actor is stuck in replay because journal isn't responding
(checking only one)
+ droppedMessageProbe.receiveMessage()
+ journal.completeUpsertFuture()
+
+ // exactly how many is racy but at least the first stash buffer full
should complete
+ probe.receiveMessages(stashCapacity)
+ }
+ }
+}
diff --git
a/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes
b/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes
new file mode 100644
index 0000000000..10838cf792
--- /dev/null
+++
b/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# EventSourcedBehavior/DurableState Read-Only Events Stackoverflow
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.typed.internal.Running#*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.internal.Running#*")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.typed.state.internal.Running#*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.state.internal.Running#*")
diff --git a/persistence-typed/src/main/resources/reference.conf
b/persistence-typed/src/main/resources/reference.conf
index 1d0b7c4237..2513dac159 100644
--- a/persistence-typed/src/main/resources/reference.conf
+++ b/persistence-typed/src/main/resources/reference.conf
@@ -47,6 +47,13 @@ pekko.persistence.typed {
# this can be changed by setting this to 'true' in which case the internal
logging is sent to
# the actor context logger.
use-context-logger-for-internal-logging = false
+
+ # Commands get stashed during persistence and un-stashed afterward.
Normally, when many read-only
+ # (i.e. that do not cause any persistence) commands are present in the
stash, the functions are called
+ # in a loop to unstash all of them. Setting this to true, will cause
functions to be called
+ # recursively (that was the default before this setting was introduced).
That might cause a stack
+ # overflow in case there are many messages to unstash.
+ recurse-when-unstashing-read-only-commands = false
}
pekko.reliable-delivery {
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
index 8992743640..6ab4dcf661 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
@@ -63,6 +63,9 @@ import pekko.annotation.InternalApi
val useContextLoggerForInternalLogging =
typedConfig.getBoolean("use-context-logger-for-internal-logging")
+ val recurseWhenUnstashingReadOnlyCommands =
+ typedConfig.getBoolean("recurse-when-unstashing-read-only-commands")
+
EventSourcedSettings(
stashCapacity = stashCapacity,
stashOverflowStrategy,
@@ -71,7 +74,8 @@ import pekko.annotation.InternalApi
snapshotPluginId,
journalPluginConfig,
snapshotPluginConfig,
- useContextLoggerForInternalLogging)
+ useContextLoggerForInternalLogging,
+ recurseWhenUnstashingReadOnlyCommands)
}
}
@@ -87,7 +91,8 @@ private[pekko] final case class EventSourcedSettings(
snapshotPluginId: String,
journalPluginConfig: Option[Config],
snapshotPluginConfig: Option[Config],
- useContextLoggerForInternalLogging: Boolean) {
+ useContextLoggerForInternalLogging: Boolean,
+ recurseWhenUnstashingReadOnlyCommands: Boolean) {
require(journalPluginId != null, "journal plugin id must not be null; use
empty string for 'default' journal")
require(
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
index 21747792d2..304e3a0d2b 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala
@@ -54,10 +54,10 @@ private[pekko] trait JournalInteractions[C, E, S] {
protected def internalPersist(
ctx: ActorContext[_],
cmd: Any,
- state: Running.RunningState[S],
+ state: Running.RunningState[S, C],
event: EventOrTaggedOrReplicated,
eventAdapterManifest: String,
- metadata: OptionVal[Any]): Running.RunningState[S] = {
+ metadata: OptionVal[Any]): Running.RunningState[S, C] = {
val newRunningState = state.nextSequenceNr()
@@ -93,8 +93,8 @@ private[pekko] trait JournalInteractions[C, E, S] {
protected def internalPersistAll(
ctx: ActorContext[_],
cmd: Any,
- state: Running.RunningState[S],
- events: immutable.Seq[EventToPersist]): Running.RunningState[S] = {
+ state: Running.RunningState[S, C],
+ events: immutable.Seq[EventToPersist]): Running.RunningState[S, C] = {
if (events.nonEmpty) {
var newState = state
@@ -202,7 +202,7 @@ private[pekko] trait SnapshotInteractions[C, E, S] {
setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId.id, criteria,
toSequenceNr), setup.selfClassic)
}
- protected def internalSaveSnapshot(state: Running.RunningState[S]): Unit = {
+ protected def internalSaveSnapshot(state: Running.RunningState[S, C]): Unit
= {
setup.internalLogger.debug("Saving snapshot sequenceNr [{}]", state.seqNr)
if (state.state == null)
throw new IllegalStateException("A snapshot must not be a null state.")
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
index 4e3a3c3a50..8407bb4f5c 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
@@ -294,7 +294,7 @@ private[pekko] final class ReplayingEvents[C, E, S](
if (state.receivedPoisonPill && isInternalStashEmpty &&
!isUnstashAllInProgress)
Behaviors.stopped
else {
- val runningState = Running.RunningState[S](
+ val runningState = Running.RunningState[S, C](
seqNr = state.seqNr,
state = state.state,
receivedPoisonPill = state.receivedPoisonPill,
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index f4bf501209..bae282c033 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -97,21 +97,35 @@ private[pekko] object Running {
def currentSequenceNumber: Long
}
- final case class RunningState[State](
+ // This is part of the fix for https://github.com/apache/pekko/issues/1327
and it's necessary to break
+ // recursion between the `onCommand` and `onMessage` functions in Running[C,
E, S]#HandlingCommands. We
+ // set the flag `inOnCommandCall` to true in `onCommand` before calling
functions that will in turn call
+ // `onMessage`. In `onMessage`, if the flag is set and we would recursively
call `onCommand`, we instead
+ // save the parameters we would use to call `onCommand` and return. When
back in `onCommand`, we check if
+ // `recOnCommandParams` is not empty and, if not, act as if `onMessage`
called us directly. In function
+ // `onCommand` we have a while loop replacing recursive calls so that we
don't use all the stack space in
+ // case a lot of read-only message are stashed
+ final class UnstashRecurrenceState[State, Command] {
+ var inOnCommandCall: Boolean = false
+ var recOnCommandParams: Option[(RunningState[State, Command], Command)] =
None
+ }
+
+ final case class RunningState[State, Command](
seqNr: Long,
state: State,
receivedPoisonPill: Boolean,
version: VersionVector,
seenPerReplica: Map[ReplicaId, Long],
- replicationControl: Map[ReplicaId, ReplicationStreamControl]) {
+ replicationControl: Map[ReplicaId, ReplicationStreamControl],
+ unstashRecurrenceState: UnstashRecurrenceState[State, Command] = new
UnstashRecurrenceState[State, Command]) {
- def nextSequenceNr(): RunningState[State] =
+ def nextSequenceNr(): RunningState[State, Command] =
copy(seqNr = seqNr + 1)
- def updateLastSequenceNr(persistent: PersistentRepr): RunningState[State] =
+ def updateLastSequenceNr(persistent: PersistentRepr): RunningState[State,
Command] =
if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr)
else this
- def applyEvent[C, E](setup: BehaviorSetup[C, E, State], event: E):
RunningState[State] = {
+ def applyEvent[E](setup: BehaviorSetup[Command, E, State], event: E):
RunningState[State, Command] = {
val updated = setup.eventHandler(state, event)
copy(state = updated)
}
@@ -119,8 +133,8 @@ private[pekko] object Running {
def startReplicationStream[C, E, S](
setup: BehaviorSetup[C, E, S],
- state: RunningState[S],
- replicationSetup: ReplicationSetup): RunningState[S] = {
+ state: RunningState[S, C],
+ replicationSetup: ReplicationSetup): RunningState[S, C] = {
import scala.concurrent.duration._
val system = setup.context.system
val ref = setup.context.self
@@ -238,7 +252,7 @@ private[pekko] object Running {
// Needed for WithSeqNrAccessible, when unstashing
private var _currentSequenceNumber = 0L
- final class HandlingCommands(state: RunningState[S])
+ final class HandlingCommands(state: RunningState[S, C])
extends AbstractBehavior[InternalProtocol](setup.context)
with WithSeqNrAccessible {
@@ -249,7 +263,15 @@ private[pekko] object Running {
}
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg
match {
- case IncomingCommand(c: C @unchecked) => onCommand(state, c)
+ case IncomingCommand(c: C @unchecked) if
setup.settings.recurseWhenUnstashingReadOnlyCommands =>
+ onCommand(state, c)
+ case IncomingCommand(c: C @unchecked) =>
+ if (state.unstashRecurrenceState.inOnCommandCall) {
+ state.unstashRecurrenceState.recOnCommandParams = Some((state, c))
+ this // This will be ignored in onCommand
+ } else {
+ onCommand(state, c)
+ }
case re: ReplicatedEventEnvelope[E @unchecked] =>
onReplicatedEvent(state, re, setup.replication.get)
case pe: PublishedEventImpl =>
onPublishedEvent(state, pe)
case JournalResponse(r) =>
onDeleteEventsJournalResponse(r, state.state)
@@ -268,15 +290,60 @@ private[pekko] object Running {
else Behaviors.unhandled
}
- def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol]
= {
- val effect = setup.commandHandler(state.state, cmd)
- val (next, doUnstash) = applyEffects(cmd, state,
effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
- if (doUnstash) tryUnstashOne(next)
- else next
+ def onCommand(state: RunningState[S, C], cmd: C):
Behavior[InternalProtocol] = {
+ def callApplyEffects(rs: RunningState[S, C], c: C):
(Behavior[InternalProtocol], Boolean) = {
+ val effect = setup.commandHandler(rs.state, c)
+
+ applyEffects(c, rs, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can
we avoid the cast?
+ }
+
+ def recursiveUnstashImpl(
+ applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
+ ): Behavior[InternalProtocol] = {
+ val (next, doUnstash) = applyEffectsRetval
+
+ if (doUnstash) tryUnstashOne(next)
+ else next
+ }
+
+ def nonRecursiveUnstashImpl(
+ applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
+ ): Behavior[InternalProtocol] = {
+ @tailrec
+ def loop(applyEffectsRetval: (Behavior[InternalProtocol], Boolean)):
Behavior[InternalProtocol] = {
+ val (next, doUnstash) = applyEffectsRetval
+
+ if (doUnstash) {
+ state.unstashRecurrenceState.inOnCommandCall = true
+ val r = tryUnstashOne(next)
+ state.unstashRecurrenceState.inOnCommandCall = false
+
+ val recOnCommandParams =
state.unstashRecurrenceState.recOnCommandParams
+ state.unstashRecurrenceState.recOnCommandParams = None
+
+ recOnCommandParams match {
+ case None => r
+ case Some((rs, c)) => loop(callApplyEffects(rs, c))
+ }
+ } else {
+ next
+ }
+ }
+
+ loop(applyEffectsRetval)
+ }
+
+ val r = callApplyEffects(state, cmd)
+
+ if (setup.settings.recurseWhenUnstashingReadOnlyCommands) {
+ recursiveUnstashImpl(r)
+ } else {
+ nonRecursiveUnstashImpl(r)
+ }
}
def onReplicatedEvent(
- state: Running.RunningState[S],
+ state: Running.RunningState[S, C],
envelope: ReplicatedEventEnvelope[E],
replication: ReplicationSetup): Behavior[InternalProtocol] = {
setup.internalLogger.debugN(
@@ -300,7 +367,7 @@ private[pekko] object Running {
}
}
- def onPublishedEvent(state: Running.RunningState[S], event:
PublishedEventImpl): Behavior[InternalProtocol] = {
+ def onPublishedEvent(state: Running.RunningState[S, C], event:
PublishedEventImpl): Behavior[InternalProtocol] = {
val newBehavior: Behavior[InternalProtocol] = setup.replication match {
case None =>
setup.internalLogger.warn(
@@ -321,7 +388,7 @@ private[pekko] object Running {
}
private def onPublishedEvent(
- state: Running.RunningState[S],
+ state: Running.RunningState[S, C],
replication: ReplicationSetup,
replicatedMetadata: ReplicatedPublishedEventMetaData,
event: PublishedEventImpl): Behavior[InternalProtocol] = {
@@ -428,7 +495,7 @@ private[pekko] object Running {
replication.clearContext()
- val newState2: RunningState[S] = internalPersist(
+ val newState2: RunningState[S, C] = internalPersist(
setup.context,
null,
stateAfterApply,
@@ -464,10 +531,10 @@ private[pekko] object Running {
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
- val newState2 = setup.replication match {
+ val newState2: RunningState[S, C] = setup.replication match {
case Some(replication) =>
val updatedVersion =
stateAfterApply.version.updated(replication.replicaId.id,
_currentSequenceNumber)
- val r = internalPersist(
+ val r: RunningState[S, C] = internalPersist(
setup.context,
cmd,
stateAfterApply,
@@ -572,9 +639,10 @@ private[pekko] object Running {
(applySideEffects(sideEffects, state), true)
}
}
+
@tailrec def applyEffects(
msg: Any,
- state: RunningState[S],
+ state: RunningState[S, C],
effect: Effect[E, S],
sideEffects: immutable.Seq[SideEffect[S]] = Nil):
(Behavior[InternalProtocol], Boolean) = {
if (setup.internalLogger.isDebugEnabled &&
!effect.isInstanceOf[CompositeEffect[_, _]])
@@ -630,8 +698,8 @@ private[pekko] object Running {
// ===============================================
def persistingEvents(
- state: RunningState[S],
- visibleState: RunningState[S], // previous state until write success
+ state: RunningState[S, C],
+ visibleState: RunningState[S, C], // previous state until write success
numberOfEvents: Int,
shouldSnapshotAfterPersist: SnapshotAfterPersist,
shouldPublish: Boolean,
@@ -642,8 +710,8 @@ private[pekko] object Running {
/** INTERNAL API */
@InternalApi private[pekko] class PersistingEvents(
- var state: RunningState[S],
- var visibleState: RunningState[S], // previous state until write success
+ var state: RunningState[S, C],
+ var visibleState: RunningState[S, C], // previous state until write
success
numberOfEvents: Int,
shouldSnapshotAfterPersist: SnapshotAfterPersist,
shouldPublish: Boolean,
@@ -789,7 +857,7 @@ private[pekko] object Running {
/** INTERNAL API */
@InternalApi private[pekko] class StoringSnapshot(
- state: RunningState[S],
+ state: RunningState[S, C],
sideEffects: immutable.Seq[SideEffect[S]],
snapshotReason: SnapshotAfterPersist)
extends AbstractBehavior[InternalProtocol](setup.context)
@@ -886,7 +954,7 @@ private[pekko] object Running {
// --------------------------
- def applySideEffects(effects: immutable.Seq[SideEffect[S]], state:
RunningState[S]): Behavior[InternalProtocol] = {
+ def applySideEffects(effects: immutable.Seq[SideEffect[S]], state:
RunningState[S, C]): Behavior[InternalProtocol] = {
var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
val it = effects.iterator
@@ -905,7 +973,7 @@ private[pekko] object Running {
def applySideEffect(
effect: SideEffect[S],
- state: RunningState[S],
+ state: RunningState[S, C],
behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
effect match {
case _: Stop.type @unchecked =>
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
index adaf59e0a6..4d1d472794 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
@@ -54,13 +54,17 @@ import pekko.persistence.Persistence
val useContextLoggerForInternalLogging =
typedConfig.getBoolean("use-context-logger-for-internal-logging")
+ val recurseWhenUnstashingReadOnlyCommands =
+ typedConfig.getBoolean("recurse-when-unstashing-read-only-commands")
+
DurableStateSettings(
stashCapacity = stashCapacity,
stashOverflowStrategy,
logOnStashing = logOnStashing,
recoveryTimeout,
durableStateStorePluginId,
- useContextLoggerForInternalLogging)
+ useContextLoggerForInternalLogging,
+ recurseWhenUnstashingReadOnlyCommands)
}
private def durableStateStoreConfigFor(config: Config, pluginId: String):
Config = {
@@ -87,7 +91,8 @@ private[pekko] final case class DurableStateSettings(
logOnStashing: Boolean,
recoveryTimeout: FiniteDuration,
durableStateStorePluginId: String,
- useContextLoggerForInternalLogging: Boolean) {
+ useContextLoggerForInternalLogging: Boolean,
+ recurseWhenUnstashingReadOnlyCommands: Boolean) {
require(
durableStateStorePluginId != null,
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
index a2505e5fae..0063e88fc2 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
@@ -49,8 +49,8 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
protected def internalUpsert(
ctx: ActorContext[InternalProtocol],
cmd: Any,
- state: Running.RunningState[S],
- value: Any): Running.RunningState[S] = {
+ state: Running.RunningState[S, C],
+ value: Any): Running.RunningState[S, C] = {
val newRunningState = state.nextRevision()
val persistenceId = setup.persistenceId.id
@@ -69,9 +69,9 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
protected def internalDelete(
ctx: ActorContext[InternalProtocol],
cmd: Any,
- state: Running.RunningState[S]): Running.RunningState[S] = {
+ state: Running.RunningState[S, C]): Running.RunningState[S, C] = {
- val newRunningState = state.nextRevision().copy(state = setup.emptyState)
+ val newRunningState: Running.RunningState[S, C] =
state.nextRevision().copy(state = setup.emptyState)
val persistenceId = setup.persistenceId.id
onDeleteInitiated(ctx, cmd)
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala
index 88926675cb..1431a0c1da 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala
@@ -183,7 +183,7 @@ private[pekko] class Recovering[C, S](
if (state.receivedPoisonPill && isInternalStashEmpty &&
!isUnstashAllInProgress)
Behaviors.stopped
else {
- val runningState = Running.RunningState[S](
+ val runningState = Running.RunningState[S, C](
revision = state.revision,
state = state.state,
receivedPoisonPill = state.receivedPoisonPill)
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
index d4a0a3fcba..549946e851 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
@@ -55,12 +55,25 @@ private[pekko] object Running {
def currentRevision: Long
}
- final case class RunningState[State](revision: Long, state: State,
receivedPoisonPill: Boolean) {
+ // This is part of the fix for https://github.com/apache/pekko/issues/1327
and it's necessary to break
+ // recursion between the `onCommand` and `onMessage` functions in Running[C,
E, S]#HandlingCommands.
+ // See comment in
org.apache.pekko.persistence.typed.internal.Running#UnstashRecurrenceState for
more
+ // information
+ final class UnstashRecurrenceState[State, Command] {
+ var inOnCommandCall: Boolean = false
+ var recOnCommandParams: Option[(RunningState[State, Command], Command)] =
None
+ }
+
+ final case class RunningState[State, Command](
+ revision: Long,
+ state: State,
+ receivedPoisonPill: Boolean,
+ unstashRecurrenceState: UnstashRecurrenceState[State, Command] = new
UnstashRecurrenceState[State, Command]) {
- def nextRevision(): RunningState[State] =
+ def nextRevision(): RunningState[State, Command] =
copy(revision = revision + 1)
- def applyState[C, E](@unused setup: BehaviorSetup[C, State], updated:
State): RunningState[State] = {
+ def applyState(@unused setup: BehaviorSetup[Command, State], updated:
State): RunningState[State, Command] = {
copy(state = updated)
}
}
@@ -79,16 +92,24 @@ private[pekko] object Running {
// Needed for WithSeqNrAccessible, when unstashing
private var _currentRevision = 0L
- final class HandlingCommands(state: RunningState[S])
+ final class HandlingCommands(state: RunningState[S, C])
extends AbstractBehavior[InternalProtocol](setup.context)
with WithRevisionAccessible {
_currentRevision = state.revision
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg
match {
- case IncomingCommand(c: C @unchecked) => onCommand(state, c)
- case get: GetState[S @unchecked] => onGetState(get)
- case _ => Behaviors.unhandled
+ case IncomingCommand(c: C @unchecked) if
setup.settings.recurseWhenUnstashingReadOnlyCommands =>
+ onCommand(state, c)
+ case IncomingCommand(c: C @unchecked) =>
+ if (state.unstashRecurrenceState.inOnCommandCall) {
+ state.unstashRecurrenceState.recOnCommandParams = Some((state, c))
+ this // This will be ignored in onCommand
+ } else {
+ onCommand(state, c)
+ }
+ case get: GetState[S @unchecked] => onGetState(get)
+ case _ => Behaviors.unhandled
}
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]]
= {
@@ -100,11 +121,56 @@ private[pekko] object Running {
else Behaviors.unhandled
}
- def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol]
= {
- val effect = setup.commandHandler(state.state, cmd)
- val (next, doUnstash) = applyEffects(cmd, state,
effect.asInstanceOf[EffectImpl[S]]) // TODO can we avoid the cast?
- if (doUnstash) tryUnstashOne(next)
- else next
+ def onCommand(state: RunningState[S, C], cmd: C):
Behavior[InternalProtocol] = {
+ def callApplyEffects(rs: RunningState[S, C], c: C):
(Behavior[InternalProtocol], Boolean) = {
+ val effect = setup.commandHandler(rs.state, c)
+
+ applyEffects(c, rs, effect.asInstanceOf[EffectImpl[S]]) // TODO can we
avoid the cast?
+ }
+
+ def recursiveUnstashImpl(
+ applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
+ ): Behavior[InternalProtocol] = {
+ val (next, doUnstash) = applyEffectsRetval
+
+ if (doUnstash) tryUnstashOne(next)
+ else next
+ }
+
+ def nonRecursiveUnstashImpl(
+ applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
+ ): Behavior[InternalProtocol] = {
+ @tailrec
+ def loop(applyEffectsRetval: (Behavior[InternalProtocol], Boolean)):
Behavior[InternalProtocol] = {
+ val (next, doUnstash) = applyEffectsRetval
+
+ if (doUnstash) {
+ state.unstashRecurrenceState.inOnCommandCall = true
+ val r = tryUnstashOne(next)
+ state.unstashRecurrenceState.inOnCommandCall = false
+
+ val recOnCommandParams =
state.unstashRecurrenceState.recOnCommandParams
+ state.unstashRecurrenceState.recOnCommandParams = None
+
+ recOnCommandParams match {
+ case None => r
+ case Some((rs, c)) => loop(callApplyEffects(rs, c))
+ }
+ } else {
+ next
+ }
+ }
+
+ loop(applyEffectsRetval)
+ }
+
+ val r = callApplyEffects(state, cmd)
+
+ if (setup.settings.recurseWhenUnstashingReadOnlyCommands) {
+ recursiveUnstashImpl(r)
+ } else {
+ nonRecursiveUnstashImpl(r)
+ }
}
// Used by DurableStateBehaviorTestKit to retrieve the state.
@@ -130,7 +196,7 @@ private[pekko] object Running {
@tailrec def applyEffects(
msg: Any,
- state: RunningState[S],
+ state: RunningState[S, C],
effect: Effect[S],
sideEffects: immutable.Seq[SideEffect[S]] = Nil):
(Behavior[InternalProtocol], Boolean) = {
if (setup.internalLogger.isDebugEnabled &&
!effect.isInstanceOf[CompositeEffect[_]])
@@ -182,8 +248,8 @@ private[pekko] object Running {
// ===============================================
def persistingState(
- state: RunningState[S],
- visibleState: RunningState[S], // previous state until write success
+ state: RunningState[S, C],
+ visibleState: RunningState[S, C], // previous state until write success
sideEffects: immutable.Seq[SideEffect[S]]): Behavior[InternalProtocol] =
{
setup.setMdcPhase(PersistenceMdc.PersistingState)
new PersistingState(state, visibleState, sideEffects)
@@ -191,8 +257,8 @@ private[pekko] object Running {
/** INTERNAL API */
@InternalApi private[pekko] class PersistingState(
- var state: RunningState[S],
- var visibleState: RunningState[S], // previous state until write success
+ var state: RunningState[S, C],
+ var visibleState: RunningState[S, C], // previous state until write
success
var sideEffects: immutable.Seq[SideEffect[S]],
persistStartTime: Long = System.nanoTime())
extends AbstractBehavior[InternalProtocol](setup.context)
@@ -258,7 +324,7 @@ private[pekko] object Running {
// ===============================================
- def applySideEffects(effects: immutable.Seq[SideEffect[S]], state:
RunningState[S]): Behavior[InternalProtocol] = {
+ def applySideEffects(effects: immutable.Seq[SideEffect[S]], state:
RunningState[S, C]): Behavior[InternalProtocol] = {
var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
val it = effects.iterator
@@ -277,7 +343,7 @@ private[pekko] object Running {
def applySideEffect(
effect: SideEffect[S],
- state: RunningState[S],
+ state: RunningState[S, C],
behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
effect match {
case _: Stop.type @unchecked =>
diff --git
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala
index b7180fb78d..80b710a8dd 100644
---
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala
+++
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala
@@ -78,6 +78,7 @@ class StashStateSpec extends ScalaTestWithActorTestKit with
AnyWordSpecLike with
snapshotPluginId = "",
journalPluginConfig = None,
snapshotPluginConfig = None,
- useContextLoggerForInternalLogging = false)
+ useContextLoggerForInternalLogging = false,
+ recurseWhenUnstashingReadOnlyCommands = false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]