This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 5321efedb7 persistence-typed: custom stash support (#2433)
5321efedb7 is described below
commit 5321efedb7d7bd4d8621e02a4a88c65dd0f51b12
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Nov 7 09:26:57 2025 +0100
persistence-typed: custom stash support (#2433)
* persitence-typed: custom stash
* build issue
* format
* Update EventSourcedBehaviorWatchSpec.scala
* tests
* Update EventSourcedBehaviorStashSpec.scala
* Update DurableStateRevisionSpec.scala
* Update DurableStateRevisionSpec.scala
---
docs/src/main/paradox/typed/persistence.md | 8 ++
.../scaladsl/EventSourcedBehaviorStashSpec.scala | 44 ++++++++-
.../scaladsl/EventSourcedBehaviorWatchSpec.scala | 2 +-
.../state/scaladsl/DurableStateRevisionSpec.scala | 107 +++++++++++++++------
.../1.3.x.backwards.excludes/custom-stash.excludes | 20 ++++
.../typed/internal/EventSourcedBehaviorImpl.scala | 8 +-
.../typed/internal/EventSourcedSettings.scala | 16 +--
.../typed/javadsl/EventSourcedBehavior.scala | 22 ++++-
.../typed/scaladsl/EventSourcedBehavior.scala | 7 ++
.../state/internal/DurableStateBehaviorImpl.scala | 7 +-
.../state/internal/DurableStateSettings.scala | 16 ++-
.../state/scaladsl/DurableStateBehavior.scala | 6 ++
.../typed/BasicPersistentBehaviorTest.java | 7 ++
.../typed/BasicPersistentBehaviorCompileOnly.scala | 5 +-
14 files changed, 220 insertions(+), 55 deletions(-)
diff --git a/docs/src/main/paradox/typed/persistence.md
b/docs/src/main/paradox/typed/persistence.md
index c4b482e379..a58d350a93 100644
--- a/docs/src/main/paradox/typed/persistence.md
+++ b/docs/src/main/paradox/typed/persistence.md
@@ -632,6 +632,14 @@ buffer will fill up and when reaching its maximum capacity
the commands will be
pekko.persistence.typed.stash-capacity = 10000
```
+To override the global config from above, use the following api to define a
custom stash buffer capacity per entity:
+
+Scala
+: @@snip
[BasicPersistentBehaviorCompileOnly.scala](/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala)
{ #custom-stash-buffer }
+
+Java
+: @@snip
[BasicPersistentBehaviorTest.java](/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java)
{ #custom-stash-buffer }
+
Note that the stashed commands are kept in an in-memory buffer, so in case of
a crash they will not be
processed.
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala
index 0f05cdf016..e9b5d2215b 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala
@@ -540,11 +540,12 @@ class EventSourcedBehaviorStashSpec
stateProbe.expectMessage(State(5, active = true))
}
- "discard when stash has reached limit with default dropped setting" in {
+ trait StashLimit {
+ val customStashLimit: Option[Int] = None
val probe = TestProbe[AnyRef]()
system.toClassic.eventStream.subscribe(probe.ref.toClassic,
classOf[Dropped])
val behavior = Behaviors.setup[String] { context =>
- EventSourcedBehavior[String, String, Boolean](
+ val esb = EventSourcedBehavior[String, String, Boolean](
persistenceId = PersistenceId.ofUniqueId("stash-is-full-drop"),
emptyState = false,
commandHandler = { (state, command) =>
@@ -575,7 +576,15 @@ class EventSourcedBehaviorStashSpec
case (_, "unstash") => false
case (_, _) => throw new IllegalArgumentException()
})
+ customStashLimit match {
+ case Some(value) => esb.withStashCapacity(value)
+ case None => esb
+ }
+
}
+ }
+
+ "discard when stash has reached limit with default dropped setting" in new
StashLimit {
val c = spawn(behavior)
@@ -604,6 +613,37 @@ class EventSourcedBehaviorStashSpec
probe.expectMessage("pong")
}
+ "discard when custom stash buffer has reached limit with default dropped
setting" in new StashLimit {
+ val customLimit = 100
+ override val customStashLimit: Option[Int] = Some(customLimit)
+
+ val c = spawn(behavior)
+
+ // make sure it completed recovery, before we try to overfill the stash
+ c ! "ping"
+ probe.expectMessage("pong")
+
+ c ! "start-stashing"
+
+ LoggingTestKit.warn("Stash buffer is full, dropping message").expect {
+ (0 to customLimit).foreach { n =>
+ c ! s"cmd-$n" // limit triggers overflow
+ }
+ probe.expectMessageType[Dropped]
+ }
+
+ // we can still unstash and continue interacting
+ c ! "unstash"
+ (0 until customLimit).foreach { n =>
+ probe.expectMessage(s"cmd-$n")
+ }
+
+ probe.expectMessage("done-unstashing") // before actually unstashing,
see above
+
+ c ! "ping"
+ probe.expectMessage("pong")
+ }
+
"fail when stash has reached limit if configured to fail" in {
// persistence settings is system wide, so we need to have a custom
testkit/actorsystem here
val failStashTestKit = ActorTestKit(
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
index 9b8418f559..6124e83264 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
@@ -124,7 +124,7 @@ class EventSourcedBehaviorWatchSpec
}
Behaviors.setup[Command] { context =>
- val settings = EventSourcedSettings(context.system, "", "")
+ val settings = EventSourcedSettings(context.system, "", "", None)
setup(signalHandler, settings, context).onSignal("",
RecoveryCompleted, false) shouldEqual true
setup(PartialFunction.empty, settings, context).onSignal("",
RecoveryCompleted, false) shouldEqual false
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala
index 28f0c5b755..f02d907114 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala
@@ -14,12 +14,15 @@
package org.apache.pekko.persistence.typed.state.scaladsl
import org.apache.pekko
+import pekko.actor.Dropped
import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.testkit.typed.scaladsl.TestProbe
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
-import pekko.actor.typed.scaladsl.Behaviors
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import pekko.actor.typed.scaladsl.adapter._
import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.state.RecoveryCompleted
@@ -42,38 +45,45 @@ class DurableStateRevisionSpec
with AnyWordSpecLike
with LogCapturing {
+ private def durableState(ctx: ActorContext[String], pid: PersistenceId,
probe: ActorRef[String]) = {
+ DurableStateBehavior[String, String](
+ pid,
+ "",
+ (state, command) =>
+ state match {
+ case "stashing" =>
+ command match {
+ case "unstash" =>
+ probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)}
unstash"
+ Effect.persist("normal").thenUnstashAll()
+ case _ =>
+ Effect.stash()
+ }
+ case _ =>
+ command match {
+ case "cmd" =>
+ probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)}
onCommand"
+ Effect.persist("state").thenRun(_ => probe !
s"${DurableStateBehavior.lastSequenceNumber(ctx)} thenRun")
+ case "stash" =>
+ probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)}
stash"
+ Effect.persist("stashing")
+ case "snapshot" =>
+ Effect.persist("snapshot")
+ }
+ }).receiveSignal {
+ case (_, RecoveryCompleted) =>
+ probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)}
onRecoveryComplete"
+ }
+ }
+
private def behavior(pid: PersistenceId, probe: ActorRef[String]):
Behavior[String] =
- Behaviors.setup(ctx =>
- DurableStateBehavior[String, String](
- pid,
- "",
- (state, command) =>
- state match {
- case "stashing" =>
- command match {
- case "unstash" =>
- probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)}
unstash"
- Effect.persist("normal").thenUnstashAll()
- case _ =>
- Effect.stash()
- }
- case _ =>
- command match {
- case "cmd" =>
- probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)}
onCommand"
- Effect
- .persist("state")
- .thenRun(_ => probe !
s"${DurableStateBehavior.lastSequenceNumber(ctx)} thenRun")
- case "stash" =>
- probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)}
stash"
- Effect.persist("stashing")
- case "snapshot" =>
- Effect.persist("snapshot")
- }
- }).receiveSignal {
- case (_, RecoveryCompleted) =>
- probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)}
onRecoveryComplete"
- })
+ Behaviors.setup(ctx => durableState(ctx, pid, probe))
+
+ private def behaviorWithCustomStashSize(
+ pid: PersistenceId,
+ probe: ActorRef[String],
+ customStashSize: Int): Behavior[String] =
+ Behaviors.setup(ctx => durableState(ctx, pid,
probe).withStashCapacity(customStashSize))
"The revision number" must {
@@ -136,5 +146,38 @@ class DurableStateRevisionSpec
probe.expectMessage("2 onCommand") // second command
probe.expectMessage("3 thenRun")
}
+
+ "discard when custom stash has reached limit with default dropped setting"
in {
+ val customLimit = 100
+ val probe = TestProbe[AnyRef]()
+ system.toClassic.eventStream.subscribe(probe.ref.toClassic,
classOf[Dropped])
+ val durableState =
spawn(behaviorWithCustomStashSize(PersistenceId.ofUniqueId("pid-4"), probe.ref,
customLimit))
+ probe.expectMessage("0 onRecoveryComplete")
+
+ durableState ! "stash"
+
+ probe.expectMessage("0 stash")
+
+ LoggingTestKit.warn("Stash buffer is full, dropping message").expect {
+ (0 to customLimit).foreach { _ =>
+ durableState ! s"cmd"
+ }
+ }
+ probe.expectMessageType[Dropped]
+
+ durableState ! "unstash"
+ probe.expectMessage("1 unstash")
+
+ val lastSequenceId = 2
+ (lastSequenceId until customLimit + lastSequenceId).foreach { n =>
+ probe.expectMessage(s"$n onCommand")
+ probe.expectMessage(s"${n + 1} thenRun") // after persisting
+ }
+
+ durableState ! s"cmd"
+ probe.expectMessage(s"${102} onCommand")
+ probe.expectMessage(s"${103} thenRun")
+
+ }
}
}
diff --git
a/persistence-typed/src/main/mima-filters/1.3.x.backwards.excludes/custom-stash.excludes
b/persistence-typed/src/main/mima-filters/1.3.x.backwards.excludes/custom-stash.excludes
new file mode 100644
index 0000000000..52208f6eea
--- /dev/null
+++
b/persistence-typed/src/main/mima-filters/1.3.x.backwards.excludes/custom-stash.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Support custom stash
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withStashCapacity")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.state.scaladsl.DurableStateBehavior.withStashCapacity")
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
index b9f406b5e4..ae6bb6f221 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
@@ -119,7 +119,8 @@ private[pekko] final case class
EventSourcedBehaviorImpl[Command, Event, State](
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
override val signalHandler: PartialFunction[(State, Signal), Unit] =
PartialFunction.empty,
replication: Option[ReplicationSetup] = None,
- publishEvents: Boolean = true)
+ publishEvents: Boolean = true,
+ customStashCapacity: Option[Int] = None)
extends EventSourcedBehavior[Command, Event, State] {
import EventSourcedBehaviorImpl.WriterIdentity
@@ -138,7 +139,7 @@ private[pekko] final case class
EventSourcedBehaviorImpl[Command, Event, State](
}
if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
val settings = EventSourcedSettings(ctx.system,
journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""),
- journalPluginConfig, snapshotPluginConfig)
+ journalPluginConfig, snapshotPluginConfig, customStashCapacity)
// stashState outside supervise because StashState should survive restarts
due to persist failures
val stashState = new
StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
@@ -302,6 +303,9 @@ private[pekko] final case class
EventSourcedBehaviorImpl[Command, Event, State](
copy(publishEvents = enabled)
}
+ override def withStashCapacity(size: Int): EventSourcedBehavior[Command,
Event, State] =
+ copy(customStashCapacity = Some(size))
+
override private[pekko] def withReplication(
context: ReplicationContextImpl): EventSourcedBehavior[Command, Event,
State] = {
copy(
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
index cdc1e94bfe..ec35f4d57d 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
@@ -27,25 +27,29 @@ import com.typesafe.config.Config
def apply(
system: ActorSystem[_],
journalPluginId: String,
- snapshotPluginId: String
+ snapshotPluginId: String,
+ customStashCapacity: Option[Int]
): EventSourcedSettings =
- apply(system.settings.config, journalPluginId, snapshotPluginId, None,
None)
+ apply(system.settings.config, journalPluginId, snapshotPluginId, None,
None, customStashCapacity)
def apply(
system: ActorSystem[_],
journalPluginId: String,
snapshotPluginId: String,
journalPluginConfig: Option[Config],
- snapshotPluginConfig: Option[Config]
+ snapshotPluginConfig: Option[Config],
+ customStashCapacity: Option[Int]
): EventSourcedSettings =
- apply(system.settings.config, journalPluginId, snapshotPluginId,
journalPluginConfig, snapshotPluginConfig)
+ apply(system.settings.config, journalPluginId, snapshotPluginId,
journalPluginConfig, snapshotPluginConfig,
+ customStashCapacity)
def apply(
config: Config,
journalPluginId: String,
snapshotPluginId: String,
journalPluginConfig: Option[Config],
- snapshotPluginConfig: Option[Config]
+ snapshotPluginConfig: Option[Config],
+ customStashCapacity: Option[Int]
): EventSourcedSettings = {
val typedConfig = config.getConfig("pekko.persistence.typed")
@@ -56,7 +60,7 @@ import com.typesafe.config.Config
throw new IllegalArgumentException(s"Unknown value for
stash-overflow-strategy: [$unknown]")
}
- val stashCapacity = typedConfig.getInt("stash-capacity")
+ val stashCapacity =
customStashCapacity.getOrElse(typedConfig.getInt("stash-capacity"))
require(stashCapacity > 0, "stash-capacity MUST be > 0, unbounded
buffering is not supported.")
val logOnStashing = typedConfig.getBoolean("log-stashing")
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
index a003ae5596..415c9a2f74 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
@@ -237,10 +237,17 @@ abstract class EventSourcedBehavior[Command, Event,
State] private[pekko] (
if (handler.isEmpty) behavior
else behavior.receiveSignal(handler.handler)
- if (onPersistFailure.isPresent)
- behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get)
- else
- behaviorWithSignalHandler
+ val withSignalHandler =
+ if (onPersistFailure.isPresent)
+ behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get)
+ else
+ behaviorWithSignalHandler
+
+ if (stashCapacity.isPresent) {
+ withSignalHandler.withStashCapacity(stashCapacity.get)
+ } else {
+ withSignalHandler
+ }
}
/**
@@ -250,6 +257,13 @@ abstract class EventSourcedBehavior[Command, Event, State]
private[pekko] (
scaladsl.EventSourcedBehavior.lastSequenceNumber(ctx.asScala)
}
+ /**
+ * Override to define a custom stash capacity per entity.
+ * If not defined, the default `pekko.persistence.typed.stash-capacity` will
be used.
+ * @since 1.3.0
+ */
+ def stashCapacity: Optional[java.lang.Integer] = Optional.empty()
+
}
/**
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
index 0b4ba309de..692072a12d 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
@@ -241,6 +241,13 @@ object EventSourcedBehavior {
@ApiMayChange
def withEventPublishing(enabled: Boolean): EventSourcedBehavior[Command,
Event, State]
+ /**
+ * Define a custom stash capacity per entity.
+ * If not defined, the default `pekko.persistence.typed.stash-capacity` will
be used.
+ * @since 1.3.0
+ */
+ def withStashCapacity(size: Int): EventSourcedBehavior[Command, Event, State]
+
/**
* INTERNAL API
*/
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
index 5e7a95db5c..cbdd670194 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
@@ -63,7 +63,8 @@ private[pekko] final case class
DurableStateBehaviorImpl[Command, State](
tag: String = "",
snapshotAdapter: SnapshotAdapter[State] =
NoOpSnapshotAdapter.instance[State],
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
- override val signalHandler: PartialFunction[(State, Signal), Unit] =
PartialFunction.empty)
+ override val signalHandler: PartialFunction[(State, Signal), Unit] =
PartialFunction.empty,
+ customStashCapacity: Option[Int] = None)
extends DurableStateBehavior[Command, State] {
if (persistenceId eq null)
@@ -79,7 +80,7 @@ private[pekko] final case class
DurableStateBehaviorImpl[Command, State](
case _ => false
}
if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
- val settings = DurableStateSettings(ctx.system,
durableStateStorePluginId.getOrElse(""))
+ val settings = DurableStateSettings(ctx.system,
durableStateStorePluginId.getOrElse(""), customStashCapacity)
// stashState outside supervise because StashState should survive restarts
due to persist failures
val stashState = new
StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
@@ -178,6 +179,8 @@ private[pekko] final case class
DurableStateBehaviorImpl[Command, State](
override def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy):
DurableStateBehavior[Command, State] =
copy(supervisionStrategy = backoffStrategy)
+ override def withStashCapacity(size: Int): DurableStateBehavior[Command,
State] =
+ copy(customStashCapacity = Some(size))
}
/** Protocol used internally by the DurableStateBehavior. */
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
index df66eaeb39..442e1e0903 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
@@ -30,10 +30,16 @@ import com.typesafe.config.Config
*/
@InternalApi private[pekko] object DurableStateSettings {
- def apply(system: ActorSystem[_], durableStateStorePluginId: String):
DurableStateSettings =
- apply(system.settings.config, durableStateStorePluginId)
-
- def apply(config: Config, durableStateStorePluginId: String):
DurableStateSettings = {
+ def apply(
+ system: ActorSystem[_],
+ durableStateStorePluginId: String,
+ customStashCapacity: Option[Int]): DurableStateSettings =
+ apply(system.settings.config, durableStateStorePluginId,
customStashCapacity)
+
+ def apply(
+ config: Config,
+ durableStateStorePluginId: String,
+ customStashCapacity: Option[Int]): DurableStateSettings = {
val typedConfig = config.getConfig("pekko.persistence.typed")
val stashOverflowStrategy =
typedConfig.getString("stash-overflow-strategy").toLowerCase match {
@@ -43,7 +49,7 @@ import com.typesafe.config.Config
throw new IllegalArgumentException(s"Unknown value for
stash-overflow-strategy: [$unknown]")
}
- val stashCapacity = typedConfig.getInt("stash-capacity")
+ val stashCapacity =
customStashCapacity.getOrElse(typedConfig.getInt("stash-capacity"))
require(stashCapacity > 0, "stash-capacity MUST be > 0, unbounded
buffering is not supported.")
val logOnStashing = typedConfig.getBoolean("log-stashing")
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
index 86af20cf35..2de01499d7 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
@@ -170,4 +170,10 @@ object DurableStateBehavior {
*/
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy):
DurableStateBehavior[Command, State]
+ /**
+ * Define a custom stash capacity per entity.
+ * If not defined, the default `pekko.persistence.typed.stash-capacity` will
be used.
+ * @since 1.3.0
+ */
+ def withStashCapacity(size: Int): DurableStateBehavior[Command, State]
}
diff --git
a/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java
b/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java
index c158af0de1..5e5f77d15a 100644
---
a/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java
+++
b/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java
@@ -39,6 +39,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
public class BasicPersistentBehaviorTest {
@@ -425,6 +426,12 @@ public class BasicPersistentBehaviorTest {
};
}
+ // #custom-stash-buffer
+ public Optional<Integer> stashCapacity() {
+ return Optional.of(100);
+ }
+ // #custom-stash-buffer
+
// #wrapPersistentBehavior
@Override
public boolean shouldSnapshot(State state, Event event, long sequenceNr)
{
diff --git
a/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
b/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
index 841cc1e6e3..a24ce49f74 100644
---
a/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
+++
b/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
@@ -332,6 +332,9 @@ object BasicPersistentBehaviorCompileOnly {
commandHandler = (state, cmd) => throw new NotImplementedError("TODO:
process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO:
process the event return the next state"))
.eventAdapter(new WrapperEventAdapter[Event])
- // #install-event-adapter
+ // #install-event-adapter
+ // #custom-stash-buffer
+ .withStashCapacity(100)
+ // #custom-stash-buffer
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]