This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push: new d89f2eec8b Fix typed persistence stack overflow with many read only commands (#1919) d89f2eec8b is described below commit d89f2eec8b1cf778869a0e19bf7c77a8e0720565 Author: Tomassino-ibm <tomassino.ferra...@ibm.com> AuthorDate: Fri Jul 11 11:32:43 2025 +0200 Fix typed persistence stack overflow with many read only commands (#1919) * Update unstash stack overflow test to have it actually fail Taken changes from https://github.com/apache/pekko/pull/1336 to have a test that fails * Fix possible stack overflow in persistence-typed This commit adds code to break recurrent calls in persistence-typed while unstashing read-only commands that could lead to a stack overflow, fixing issue #1327 (limited to EventSourcedBehavior) The fix can be enabled using a feature flag, by default it is disabled * bin compat exclude and scalafmt * Also fix the same stack overflow issue in DurableStateBehavior The fix is enabled by the same feature flag used by the fix of EventSourcedBehavior * Enable by default the fix for the stack overflow Also rename parameter * Refactor code to make it more explicit that the old code path is unchanged This commit changes how `onMessage` and `onCommand` are implemented to make it clearer that, when the `recurse-when-unstashing-read-only-commands` flag is set to true, the old code path is used. Moreover, the while loop in onCommand has been changed into a tail recursive function These changes have been applied to both EventSourcedBehavior and DurableStateBehavior --------- Co-authored-by: PJ Fanning <pjfann...@users.noreply.github.com> --- .../scaladsl/EventSourcedStashOverflowSpec.scala | 8 +- .../DurableStateBehaviorStashOverflowSpec.scala | 133 +++++++++++++++++++++ .../eventsourcedbehavior-stackoverflow.excludes | 22 ++++ .../src/main/resources/reference.conf | 7 ++ .../typed/internal/EventSourcedSettings.scala | 9 +- .../typed/internal/ExternalInteractions.scala | 10 +- .../typed/internal/ReplayingEvents.scala | 2 +- .../pekko/persistence/typed/internal/Running.scala | 124 ++++++++++++++----- .../state/internal/DurableStateSettings.scala | 9 +- .../internal/DurableStateStoreInteractions.scala | 8 +- .../typed/state/internal/Recovering.scala | 2 +- .../persistence/typed/state/internal/Running.scala | 104 +++++++++++++--- .../typed/internal/StashStateSpec.scala | 3 +- 13 files changed, 375 insertions(+), 66 deletions(-) diff --git a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala index 316015ca75..34f15089e6 100644 --- a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala +++ b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala @@ -55,10 +55,12 @@ object EventSourcedStashOverflowSpec { SteppingInmemJournal.config("EventSourcedStashOverflow").withFallback(ConfigFactory.parseString(s""" pekko.persistence { typed { - stash-capacity = 1000 # enough to fail on stack size + stash-capacity = 20000 # enough to fail on stack size stash-overflow-strategy = "drop" + recurse-when-unstashing-read-only-commands = false } } + pekko.jvm-exit-on-fatal-error = off """)) } @@ -85,8 +87,8 @@ class EventSourcedStashOverflowSpec for (_ <- 0 to (stashCapacity * 2)) { es.tell(EventSourcedStringList.DoNothing(probe.ref)) } - // capacity + 1 should mean that we get a dropped last message when all stash is filled - // while the actor is stuck in replay because journal isn't responding + // capacity * 2 should mean that we get many dropped messages when all stash is filled + // while the actor is stuck in replay because journal isn't responding (checking only one) droppedMessageProbe.receiveMessage() implicit val classicSystem: pekko.actor.ActorSystem = testKit.system.toClassic diff --git a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala new file mode 100644 index 0000000000..ada975be5b --- /dev/null +++ b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com> + */ + +package org.apache.pekko.persistence.typed.state.scaladsl + +import org.apache.pekko +import pekko.Done +import pekko.actor.testkit.typed.scaladsl.LogCapturing +import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import pekko.actor.typed.ActorRef +import pekko.actor.typed.Behavior +import pekko.persistence.state.DurableStateStoreProvider +import pekko.persistence.state.scaladsl.{ DurableStateStore, DurableStateUpdateStore, GetObjectResult } +import pekko.persistence.state.javadsl.{ DurableStateStore => JDurableStateStore } +import pekko.persistence.typed.PersistenceId +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration._ + +object DurableStateBehaviorStashOverflowSpec { + + class TestDurableStateStoreProvider extends DurableStateStoreProvider { + private val store = new TestDurableStateStorePlugin[Any] + + override def scaladslDurableStateStore(): DurableStateStore[Any] = store + + // Not used here + override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = null + } + + object TestDurableStateStorePlugin { + @volatile var instance: Option[TestDurableStateStorePlugin[_]] = None + def getInstance(): TestDurableStateStorePlugin[_] = instance.get + } + + class TestDurableStateStorePlugin[A] extends DurableStateUpdateStore[A] { + TestDurableStateStorePlugin.instance = Some(this) + + private val promise: Promise[Done] = Promise() + + override def getObject(persistenceId: String): Future[GetObjectResult[A]] = + Future.successful(GetObjectResult[A](None, 0L)) + + override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = + promise.future + + def completeUpsertFuture(): Unit = promise.success(Done) + + override def deleteObject(persistenceId: String): Future[Done] = Future.successful(Done) + + override def deleteObject(persistenceId: String, revision: Long): Future[Done] = Future.successful(Done) + } + + object DurableStateString { + sealed trait Command + case class DoNothing(replyTo: ActorRef[Done]) extends Command + case class DoPersist(replyTo: ActorRef[Done]) extends Command + + def apply(persistenceId: PersistenceId): Behavior[Command] = + DurableStateBehavior[Command, String]( + persistenceId, + "", + { (_, command) => + command match { + case DoNothing(replyTo) => + Effect.reply(replyTo)(Done) + + case DoPersist(replyTo) => + Effect.persist("Initial persist").thenRun(_ => replyTo ! Done) + } + }) + } + + def conf = ConfigFactory.parseString(s""" + pekko.persistence { + state.plugin = "my-state-plugin" + typed { + stash-capacity = 20000 # enough to fail on stack size + stash-overflow-strategy = "drop" + recurse-when-unstashing-read-only-commands = false + } + } + pekko.jvm-exit-on-fatal-error = off + my-state-plugin.class = "${classOf[TestDurableStateStoreProvider].getName}" + """) +} + +class DurableStateBehaviorStashOverflowSpec + extends ScalaTestWithActorTestKit(DurableStateBehaviorStashOverflowSpec.conf) + with AnyWordSpecLike + with LogCapturing { + + import DurableStateBehaviorStashOverflowSpec._ + + "Stashing in a busy durable state behavior" must { + + "not cause stack overflow" in { + val es = spawn(DurableStateString(PersistenceId.ofUniqueId("id-1"))) + + // wait for journal to start + val probe = testKit.createTestProbe[Done]() + val journal = probe.awaitAssert(TestDurableStateStorePlugin.getInstance(), 3.seconds) + + val droppedMessageProbe = testKit.createDroppedMessageProbe() + val stashCapacity = testKit.config.getInt("pekko.persistence.typed.stash-capacity") + + es.tell(DurableStateString.DoPersist(probe.ref)) + + for (_ <- 0 to (stashCapacity * 2)) { + es.tell(DurableStateString.DoNothing(probe.ref)) + } + // capacity * 2 should mean that we get many dropped messages when all stash is filled + // while the actor is stuck in replay because journal isn't responding (checking only one) + droppedMessageProbe.receiveMessage() + journal.completeUpsertFuture() + + // exactly how many is racy but at least the first stash buffer full should complete + probe.receiveMessages(stashCapacity) + } + } +} diff --git a/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes b/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes new file mode 100644 index 0000000000..10838cf792 --- /dev/null +++ b/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior-stackoverflow.excludes @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# EventSourcedBehavior/DurableState Read-Only Events Stackoverflow +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.typed.internal.Running#*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.internal.Running#*") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.typed.state.internal.Running#*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.state.internal.Running#*") diff --git a/persistence-typed/src/main/resources/reference.conf b/persistence-typed/src/main/resources/reference.conf index 1d0b7c4237..2513dac159 100644 --- a/persistence-typed/src/main/resources/reference.conf +++ b/persistence-typed/src/main/resources/reference.conf @@ -47,6 +47,13 @@ pekko.persistence.typed { # this can be changed by setting this to 'true' in which case the internal logging is sent to # the actor context logger. use-context-logger-for-internal-logging = false + + # Commands get stashed during persistence and un-stashed afterward. Normally, when many read-only + # (i.e. that do not cause any persistence) commands are present in the stash, the functions are called + # in a loop to unstash all of them. Setting this to true, will cause functions to be called + # recursively (that was the default before this setting was introduced). That might cause a stack + # overflow in case there are many messages to unstash. + recurse-when-unstashing-read-only-commands = false } pekko.reliable-delivery { diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala index 8992743640..6ab4dcf661 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala @@ -63,6 +63,9 @@ import pekko.annotation.InternalApi val useContextLoggerForInternalLogging = typedConfig.getBoolean("use-context-logger-for-internal-logging") + val recurseWhenUnstashingReadOnlyCommands = + typedConfig.getBoolean("recurse-when-unstashing-read-only-commands") + EventSourcedSettings( stashCapacity = stashCapacity, stashOverflowStrategy, @@ -71,7 +74,8 @@ import pekko.annotation.InternalApi snapshotPluginId, journalPluginConfig, snapshotPluginConfig, - useContextLoggerForInternalLogging) + useContextLoggerForInternalLogging, + recurseWhenUnstashingReadOnlyCommands) } } @@ -87,7 +91,8 @@ private[pekko] final case class EventSourcedSettings( snapshotPluginId: String, journalPluginConfig: Option[Config], snapshotPluginConfig: Option[Config], - useContextLoggerForInternalLogging: Boolean) { + useContextLoggerForInternalLogging: Boolean, + recurseWhenUnstashingReadOnlyCommands: Boolean) { require(journalPluginId != null, "journal plugin id must not be null; use empty string for 'default' journal") require( diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala index 21747792d2..304e3a0d2b 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala @@ -54,10 +54,10 @@ private[pekko] trait JournalInteractions[C, E, S] { protected def internalPersist( ctx: ActorContext[_], cmd: Any, - state: Running.RunningState[S], + state: Running.RunningState[S, C], event: EventOrTaggedOrReplicated, eventAdapterManifest: String, - metadata: OptionVal[Any]): Running.RunningState[S] = { + metadata: OptionVal[Any]): Running.RunningState[S, C] = { val newRunningState = state.nextSequenceNr() @@ -93,8 +93,8 @@ private[pekko] trait JournalInteractions[C, E, S] { protected def internalPersistAll( ctx: ActorContext[_], cmd: Any, - state: Running.RunningState[S], - events: immutable.Seq[EventToPersist]): Running.RunningState[S] = { + state: Running.RunningState[S, C], + events: immutable.Seq[EventToPersist]): Running.RunningState[S, C] = { if (events.nonEmpty) { var newState = state @@ -202,7 +202,7 @@ private[pekko] trait SnapshotInteractions[C, E, S] { setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId.id, criteria, toSequenceNr), setup.selfClassic) } - protected def internalSaveSnapshot(state: Running.RunningState[S]): Unit = { + protected def internalSaveSnapshot(state: Running.RunningState[S, C]): Unit = { setup.internalLogger.debug("Saving snapshot sequenceNr [{}]", state.seqNr) if (state.state == null) throw new IllegalStateException("A snapshot must not be a null state.") diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala index 4e3a3c3a50..8407bb4f5c 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala @@ -294,7 +294,7 @@ private[pekko] final class ReplayingEvents[C, E, S]( if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped else { - val runningState = Running.RunningState[S]( + val runningState = Running.RunningState[S, C]( seqNr = state.seqNr, state = state.state, receivedPoisonPill = state.receivedPoisonPill, diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala index f4bf501209..bae282c033 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala @@ -97,21 +97,35 @@ private[pekko] object Running { def currentSequenceNumber: Long } - final case class RunningState[State]( + // This is part of the fix for https://github.com/apache/pekko/issues/1327 and it's necessary to break + // recursion between the `onCommand` and `onMessage` functions in Running[C, E, S]#HandlingCommands. We + // set the flag `inOnCommandCall` to true in `onCommand` before calling functions that will in turn call + // `onMessage`. In `onMessage`, if the flag is set and we would recursively call `onCommand`, we instead + // save the parameters we would use to call `onCommand` and return. When back in `onCommand`, we check if + // `recOnCommandParams` is not empty and, if not, act as if `onMessage` called us directly. In function + // `onCommand` we have a while loop replacing recursive calls so that we don't use all the stack space in + // case a lot of read-only message are stashed + final class UnstashRecurrenceState[State, Command] { + var inOnCommandCall: Boolean = false + var recOnCommandParams: Option[(RunningState[State, Command], Command)] = None + } + + final case class RunningState[State, Command]( seqNr: Long, state: State, receivedPoisonPill: Boolean, version: VersionVector, seenPerReplica: Map[ReplicaId, Long], - replicationControl: Map[ReplicaId, ReplicationStreamControl]) { + replicationControl: Map[ReplicaId, ReplicationStreamControl], + unstashRecurrenceState: UnstashRecurrenceState[State, Command] = new UnstashRecurrenceState[State, Command]) { - def nextSequenceNr(): RunningState[State] = + def nextSequenceNr(): RunningState[State, Command] = copy(seqNr = seqNr + 1) - def updateLastSequenceNr(persistent: PersistentRepr): RunningState[State] = + def updateLastSequenceNr(persistent: PersistentRepr): RunningState[State, Command] = if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this - def applyEvent[C, E](setup: BehaviorSetup[C, E, State], event: E): RunningState[State] = { + def applyEvent[E](setup: BehaviorSetup[Command, E, State], event: E): RunningState[State, Command] = { val updated = setup.eventHandler(state, event) copy(state = updated) } @@ -119,8 +133,8 @@ private[pekko] object Running { def startReplicationStream[C, E, S]( setup: BehaviorSetup[C, E, S], - state: RunningState[S], - replicationSetup: ReplicationSetup): RunningState[S] = { + state: RunningState[S, C], + replicationSetup: ReplicationSetup): RunningState[S, C] = { import scala.concurrent.duration._ val system = setup.context.system val ref = setup.context.self @@ -238,7 +252,7 @@ private[pekko] object Running { // Needed for WithSeqNrAccessible, when unstashing private var _currentSequenceNumber = 0L - final class HandlingCommands(state: RunningState[S]) + final class HandlingCommands(state: RunningState[S, C]) extends AbstractBehavior[InternalProtocol](setup.context) with WithSeqNrAccessible { @@ -249,7 +263,15 @@ private[pekko] object Running { } def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { - case IncomingCommand(c: C @unchecked) => onCommand(state, c) + case IncomingCommand(c: C @unchecked) if setup.settings.recurseWhenUnstashingReadOnlyCommands => + onCommand(state, c) + case IncomingCommand(c: C @unchecked) => + if (state.unstashRecurrenceState.inOnCommandCall) { + state.unstashRecurrenceState.recOnCommandParams = Some((state, c)) + this // This will be ignored in onCommand + } else { + onCommand(state, c) + } case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re, setup.replication.get) case pe: PublishedEventImpl => onPublishedEvent(state, pe) case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) @@ -268,15 +290,60 @@ private[pekko] object Running { else Behaviors.unhandled } - def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = { - val effect = setup.commandHandler(state.state, cmd) - val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? - if (doUnstash) tryUnstashOne(next) - else next + def onCommand(state: RunningState[S, C], cmd: C): Behavior[InternalProtocol] = { + def callApplyEffects(rs: RunningState[S, C], c: C): (Behavior[InternalProtocol], Boolean) = { + val effect = setup.commandHandler(rs.state, c) + + applyEffects(c, rs, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? + } + + def recursiveUnstashImpl( + applyEffectsRetval: (Behavior[InternalProtocol], Boolean) + ): Behavior[InternalProtocol] = { + val (next, doUnstash) = applyEffectsRetval + + if (doUnstash) tryUnstashOne(next) + else next + } + + def nonRecursiveUnstashImpl( + applyEffectsRetval: (Behavior[InternalProtocol], Boolean) + ): Behavior[InternalProtocol] = { + @tailrec + def loop(applyEffectsRetval: (Behavior[InternalProtocol], Boolean)): Behavior[InternalProtocol] = { + val (next, doUnstash) = applyEffectsRetval + + if (doUnstash) { + state.unstashRecurrenceState.inOnCommandCall = true + val r = tryUnstashOne(next) + state.unstashRecurrenceState.inOnCommandCall = false + + val recOnCommandParams = state.unstashRecurrenceState.recOnCommandParams + state.unstashRecurrenceState.recOnCommandParams = None + + recOnCommandParams match { + case None => r + case Some((rs, c)) => loop(callApplyEffects(rs, c)) + } + } else { + next + } + } + + loop(applyEffectsRetval) + } + + val r = callApplyEffects(state, cmd) + + if (setup.settings.recurseWhenUnstashingReadOnlyCommands) { + recursiveUnstashImpl(r) + } else { + nonRecursiveUnstashImpl(r) + } } def onReplicatedEvent( - state: Running.RunningState[S], + state: Running.RunningState[S, C], envelope: ReplicatedEventEnvelope[E], replication: ReplicationSetup): Behavior[InternalProtocol] = { setup.internalLogger.debugN( @@ -300,7 +367,7 @@ private[pekko] object Running { } } - def onPublishedEvent(state: Running.RunningState[S], event: PublishedEventImpl): Behavior[InternalProtocol] = { + def onPublishedEvent(state: Running.RunningState[S, C], event: PublishedEventImpl): Behavior[InternalProtocol] = { val newBehavior: Behavior[InternalProtocol] = setup.replication match { case None => setup.internalLogger.warn( @@ -321,7 +388,7 @@ private[pekko] object Running { } private def onPublishedEvent( - state: Running.RunningState[S], + state: Running.RunningState[S, C], replication: ReplicationSetup, replicatedMetadata: ReplicatedPublishedEventMetaData, event: PublishedEventImpl): Behavior[InternalProtocol] = { @@ -428,7 +495,7 @@ private[pekko] object Running { replication.clearContext() - val newState2: RunningState[S] = internalPersist( + val newState2: RunningState[S, C] = internalPersist( setup.context, null, stateAfterApply, @@ -464,10 +531,10 @@ private[pekko] object Running { val eventToPersist = adaptEvent(event) val eventAdapterManifest = setup.eventAdapter.manifest(event) - val newState2 = setup.replication match { + val newState2: RunningState[S, C] = setup.replication match { case Some(replication) => val updatedVersion = stateAfterApply.version.updated(replication.replicaId.id, _currentSequenceNumber) - val r = internalPersist( + val r: RunningState[S, C] = internalPersist( setup.context, cmd, stateAfterApply, @@ -572,9 +639,10 @@ private[pekko] object Running { (applySideEffects(sideEffects, state), true) } } + @tailrec def applyEffects( msg: Any, - state: RunningState[S], + state: RunningState[S, C], effect: Effect[E, S], sideEffects: immutable.Seq[SideEffect[S]] = Nil): (Behavior[InternalProtocol], Boolean) = { if (setup.internalLogger.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_, _]]) @@ -630,8 +698,8 @@ private[pekko] object Running { // =============================================== def persistingEvents( - state: RunningState[S], - visibleState: RunningState[S], // previous state until write success + state: RunningState[S, C], + visibleState: RunningState[S, C], // previous state until write success numberOfEvents: Int, shouldSnapshotAfterPersist: SnapshotAfterPersist, shouldPublish: Boolean, @@ -642,8 +710,8 @@ private[pekko] object Running { /** INTERNAL API */ @InternalApi private[pekko] class PersistingEvents( - var state: RunningState[S], - var visibleState: RunningState[S], // previous state until write success + var state: RunningState[S, C], + var visibleState: RunningState[S, C], // previous state until write success numberOfEvents: Int, shouldSnapshotAfterPersist: SnapshotAfterPersist, shouldPublish: Boolean, @@ -789,7 +857,7 @@ private[pekko] object Running { /** INTERNAL API */ @InternalApi private[pekko] class StoringSnapshot( - state: RunningState[S], + state: RunningState[S, C], sideEffects: immutable.Seq[SideEffect[S]], snapshotReason: SnapshotAfterPersist) extends AbstractBehavior[InternalProtocol](setup.context) @@ -886,7 +954,7 @@ private[pekko] object Running { // -------------------------- - def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S]): Behavior[InternalProtocol] = { + def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S, C]): Behavior[InternalProtocol] = { var behavior: Behavior[InternalProtocol] = new HandlingCommands(state) val it = effects.iterator @@ -905,7 +973,7 @@ private[pekko] object Running { def applySideEffect( effect: SideEffect[S], - state: RunningState[S], + state: RunningState[S, C], behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { effect match { case _: Stop.type @unchecked => diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala index adaf59e0a6..4d1d472794 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala @@ -54,13 +54,17 @@ import pekko.persistence.Persistence val useContextLoggerForInternalLogging = typedConfig.getBoolean("use-context-logger-for-internal-logging") + val recurseWhenUnstashingReadOnlyCommands = + typedConfig.getBoolean("recurse-when-unstashing-read-only-commands") + DurableStateSettings( stashCapacity = stashCapacity, stashOverflowStrategy, logOnStashing = logOnStashing, recoveryTimeout, durableStateStorePluginId, - useContextLoggerForInternalLogging) + useContextLoggerForInternalLogging, + recurseWhenUnstashingReadOnlyCommands) } private def durableStateStoreConfigFor(config: Config, pluginId: String): Config = { @@ -87,7 +91,8 @@ private[pekko] final case class DurableStateSettings( logOnStashing: Boolean, recoveryTimeout: FiniteDuration, durableStateStorePluginId: String, - useContextLoggerForInternalLogging: Boolean) { + useContextLoggerForInternalLogging: Boolean, + recurseWhenUnstashingReadOnlyCommands: Boolean) { require( durableStateStorePluginId != null, diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala index a2505e5fae..0063e88fc2 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala @@ -49,8 +49,8 @@ private[pekko] trait DurableStateStoreInteractions[C, S] { protected def internalUpsert( ctx: ActorContext[InternalProtocol], cmd: Any, - state: Running.RunningState[S], - value: Any): Running.RunningState[S] = { + state: Running.RunningState[S, C], + value: Any): Running.RunningState[S, C] = { val newRunningState = state.nextRevision() val persistenceId = setup.persistenceId.id @@ -69,9 +69,9 @@ private[pekko] trait DurableStateStoreInteractions[C, S] { protected def internalDelete( ctx: ActorContext[InternalProtocol], cmd: Any, - state: Running.RunningState[S]): Running.RunningState[S] = { + state: Running.RunningState[S, C]): Running.RunningState[S, C] = { - val newRunningState = state.nextRevision().copy(state = setup.emptyState) + val newRunningState: Running.RunningState[S, C] = state.nextRevision().copy(state = setup.emptyState) val persistenceId = setup.persistenceId.id onDeleteInitiated(ctx, cmd) diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala index 88926675cb..1431a0c1da 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Recovering.scala @@ -183,7 +183,7 @@ private[pekko] class Recovering[C, S]( if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped else { - val runningState = Running.RunningState[S]( + val runningState = Running.RunningState[S, C]( revision = state.revision, state = state.state, receivedPoisonPill = state.receivedPoisonPill) diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala index d4a0a3fcba..549946e851 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala @@ -55,12 +55,25 @@ private[pekko] object Running { def currentRevision: Long } - final case class RunningState[State](revision: Long, state: State, receivedPoisonPill: Boolean) { + // This is part of the fix for https://github.com/apache/pekko/issues/1327 and it's necessary to break + // recursion between the `onCommand` and `onMessage` functions in Running[C, E, S]#HandlingCommands. + // See comment in org.apache.pekko.persistence.typed.internal.Running#UnstashRecurrenceState for more + // information + final class UnstashRecurrenceState[State, Command] { + var inOnCommandCall: Boolean = false + var recOnCommandParams: Option[(RunningState[State, Command], Command)] = None + } + + final case class RunningState[State, Command]( + revision: Long, + state: State, + receivedPoisonPill: Boolean, + unstashRecurrenceState: UnstashRecurrenceState[State, Command] = new UnstashRecurrenceState[State, Command]) { - def nextRevision(): RunningState[State] = + def nextRevision(): RunningState[State, Command] = copy(revision = revision + 1) - def applyState[C, E](@unused setup: BehaviorSetup[C, State], updated: State): RunningState[State] = { + def applyState(@unused setup: BehaviorSetup[Command, State], updated: State): RunningState[State, Command] = { copy(state = updated) } } @@ -79,16 +92,24 @@ private[pekko] object Running { // Needed for WithSeqNrAccessible, when unstashing private var _currentRevision = 0L - final class HandlingCommands(state: RunningState[S]) + final class HandlingCommands(state: RunningState[S, C]) extends AbstractBehavior[InternalProtocol](setup.context) with WithRevisionAccessible { _currentRevision = state.revision def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { - case IncomingCommand(c: C @unchecked) => onCommand(state, c) - case get: GetState[S @unchecked] => onGetState(get) - case _ => Behaviors.unhandled + case IncomingCommand(c: C @unchecked) if setup.settings.recurseWhenUnstashingReadOnlyCommands => + onCommand(state, c) + case IncomingCommand(c: C @unchecked) => + if (state.unstashRecurrenceState.inOnCommandCall) { + state.unstashRecurrenceState.recOnCommandParams = Some((state, c)) + this // This will be ignored in onCommand + } else { + onCommand(state, c) + } + case get: GetState[S @unchecked] => onGetState(get) + case _ => Behaviors.unhandled } override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { @@ -100,11 +121,56 @@ private[pekko] object Running { else Behaviors.unhandled } - def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = { - val effect = setup.commandHandler(state.state, cmd) - val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[S]]) // TODO can we avoid the cast? - if (doUnstash) tryUnstashOne(next) - else next + def onCommand(state: RunningState[S, C], cmd: C): Behavior[InternalProtocol] = { + def callApplyEffects(rs: RunningState[S, C], c: C): (Behavior[InternalProtocol], Boolean) = { + val effect = setup.commandHandler(rs.state, c) + + applyEffects(c, rs, effect.asInstanceOf[EffectImpl[S]]) // TODO can we avoid the cast? + } + + def recursiveUnstashImpl( + applyEffectsRetval: (Behavior[InternalProtocol], Boolean) + ): Behavior[InternalProtocol] = { + val (next, doUnstash) = applyEffectsRetval + + if (doUnstash) tryUnstashOne(next) + else next + } + + def nonRecursiveUnstashImpl( + applyEffectsRetval: (Behavior[InternalProtocol], Boolean) + ): Behavior[InternalProtocol] = { + @tailrec + def loop(applyEffectsRetval: (Behavior[InternalProtocol], Boolean)): Behavior[InternalProtocol] = { + val (next, doUnstash) = applyEffectsRetval + + if (doUnstash) { + state.unstashRecurrenceState.inOnCommandCall = true + val r = tryUnstashOne(next) + state.unstashRecurrenceState.inOnCommandCall = false + + val recOnCommandParams = state.unstashRecurrenceState.recOnCommandParams + state.unstashRecurrenceState.recOnCommandParams = None + + recOnCommandParams match { + case None => r + case Some((rs, c)) => loop(callApplyEffects(rs, c)) + } + } else { + next + } + } + + loop(applyEffectsRetval) + } + + val r = callApplyEffects(state, cmd) + + if (setup.settings.recurseWhenUnstashingReadOnlyCommands) { + recursiveUnstashImpl(r) + } else { + nonRecursiveUnstashImpl(r) + } } // Used by DurableStateBehaviorTestKit to retrieve the state. @@ -130,7 +196,7 @@ private[pekko] object Running { @tailrec def applyEffects( msg: Any, - state: RunningState[S], + state: RunningState[S, C], effect: Effect[S], sideEffects: immutable.Seq[SideEffect[S]] = Nil): (Behavior[InternalProtocol], Boolean) = { if (setup.internalLogger.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_]]) @@ -182,8 +248,8 @@ private[pekko] object Running { // =============================================== def persistingState( - state: RunningState[S], - visibleState: RunningState[S], // previous state until write success + state: RunningState[S, C], + visibleState: RunningState[S, C], // previous state until write success sideEffects: immutable.Seq[SideEffect[S]]): Behavior[InternalProtocol] = { setup.setMdcPhase(PersistenceMdc.PersistingState) new PersistingState(state, visibleState, sideEffects) @@ -191,8 +257,8 @@ private[pekko] object Running { /** INTERNAL API */ @InternalApi private[pekko] class PersistingState( - var state: RunningState[S], - var visibleState: RunningState[S], // previous state until write success + var state: RunningState[S, C], + var visibleState: RunningState[S, C], // previous state until write success var sideEffects: immutable.Seq[SideEffect[S]], persistStartTime: Long = System.nanoTime()) extends AbstractBehavior[InternalProtocol](setup.context) @@ -258,7 +324,7 @@ private[pekko] object Running { // =============================================== - def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S]): Behavior[InternalProtocol] = { + def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S, C]): Behavior[InternalProtocol] = { var behavior: Behavior[InternalProtocol] = new HandlingCommands(state) val it = effects.iterator @@ -277,7 +343,7 @@ private[pekko] object Running { def applySideEffect( effect: SideEffect[S], - state: RunningState[S], + state: RunningState[S, C], behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { effect match { case _: Stop.type @unchecked => diff --git a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala index b7180fb78d..80b710a8dd 100644 --- a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala +++ b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala @@ -78,6 +78,7 @@ class StashStateSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with snapshotPluginId = "", journalPluginConfig = None, snapshotPluginConfig = None, - useContextLoggerForInternalLogging = false) + useContextLoggerForInternalLogging = false, + recurseWhenUnstashingReadOnlyCommands = false) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pekko.apache.org For additional commands, e-mail: commits-h...@pekko.apache.org