This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 5321efedb7 persistence-typed: custom stash support (#2433)
5321efedb7 is described below

commit 5321efedb7d7bd4d8621e02a4a88c65dd0f51b12
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Nov 7 09:26:57 2025 +0100

    persistence-typed: custom stash support (#2433)
    
    * persitence-typed: custom stash
    
    * build issue
    
    * format
    
    * Update EventSourcedBehaviorWatchSpec.scala
    
    * tests
    
    * Update EventSourcedBehaviorStashSpec.scala
    
    * Update DurableStateRevisionSpec.scala
    
    * Update DurableStateRevisionSpec.scala
---
 docs/src/main/paradox/typed/persistence.md         |   8 ++
 .../scaladsl/EventSourcedBehaviorStashSpec.scala   |  44 ++++++++-
 .../scaladsl/EventSourcedBehaviorWatchSpec.scala   |   2 +-
 .../state/scaladsl/DurableStateRevisionSpec.scala  | 107 +++++++++++++++------
 .../1.3.x.backwards.excludes/custom-stash.excludes |  20 ++++
 .../typed/internal/EventSourcedBehaviorImpl.scala  |   8 +-
 .../typed/internal/EventSourcedSettings.scala      |  16 +--
 .../typed/javadsl/EventSourcedBehavior.scala       |  22 ++++-
 .../typed/scaladsl/EventSourcedBehavior.scala      |   7 ++
 .../state/internal/DurableStateBehaviorImpl.scala  |   7 +-
 .../state/internal/DurableStateSettings.scala      |  16 ++-
 .../state/scaladsl/DurableStateBehavior.scala      |   6 ++
 .../typed/BasicPersistentBehaviorTest.java         |   7 ++
 .../typed/BasicPersistentBehaviorCompileOnly.scala |   5 +-
 14 files changed, 220 insertions(+), 55 deletions(-)

diff --git a/docs/src/main/paradox/typed/persistence.md 
b/docs/src/main/paradox/typed/persistence.md
index c4b482e379..a58d350a93 100644
--- a/docs/src/main/paradox/typed/persistence.md
+++ b/docs/src/main/paradox/typed/persistence.md
@@ -632,6 +632,14 @@ buffer will fill up and when reaching its maximum capacity 
the commands will be
 pekko.persistence.typed.stash-capacity = 10000
 ```
 
+To override the global config from above, use the following api to define a 
custom stash buffer capacity per entity:
+
+Scala
+:  @@snip 
[BasicPersistentBehaviorCompileOnly.scala](/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala)
 { #custom-stash-buffer }
+
+Java
+:  @@snip 
[BasicPersistentBehaviorTest.java](/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java)
 { #custom-stash-buffer }
+
 Note that the stashed commands are kept in an in-memory buffer, so in case of 
a crash they will not be
 processed.
 
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala
index 0f05cdf016..e9b5d2215b 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala
@@ -540,11 +540,12 @@ class EventSourcedBehaviorStashSpec
       stateProbe.expectMessage(State(5, active = true))
     }
 
-    "discard when stash has reached limit with default dropped setting" in {
+    trait StashLimit {
+      val customStashLimit: Option[Int] = None
       val probe = TestProbe[AnyRef]()
       system.toClassic.eventStream.subscribe(probe.ref.toClassic, 
classOf[Dropped])
       val behavior = Behaviors.setup[String] { context =>
-        EventSourcedBehavior[String, String, Boolean](
+        val esb = EventSourcedBehavior[String, String, Boolean](
           persistenceId = PersistenceId.ofUniqueId("stash-is-full-drop"),
           emptyState = false,
           commandHandler = { (state, command) =>
@@ -575,7 +576,15 @@ class EventSourcedBehaviorStashSpec
             case (_, "unstash")        => false
             case (_, _)                => throw new IllegalArgumentException()
           })
+        customStashLimit match {
+          case Some(value) => esb.withStashCapacity(value)
+          case None        => esb
+        }
+
       }
+    }
+
+    "discard when stash has reached limit with default dropped setting" in new 
StashLimit {
 
       val c = spawn(behavior)
 
@@ -604,6 +613,37 @@ class EventSourcedBehaviorStashSpec
       probe.expectMessage("pong")
     }
 
+    "discard when custom stash buffer has reached limit with default dropped 
setting" in new StashLimit {
+      val customLimit = 100
+      override val customStashLimit: Option[Int] = Some(customLimit)
+
+      val c = spawn(behavior)
+
+      // make sure it completed recovery, before we try to overfill the stash
+      c ! "ping"
+      probe.expectMessage("pong")
+
+      c ! "start-stashing"
+
+      LoggingTestKit.warn("Stash buffer is full, dropping message").expect {
+        (0 to customLimit).foreach { n =>
+          c ! s"cmd-$n" // limit triggers overflow
+        }
+        probe.expectMessageType[Dropped]
+      }
+
+      // we can still unstash and continue interacting
+      c ! "unstash"
+      (0 until customLimit).foreach { n =>
+        probe.expectMessage(s"cmd-$n")
+      }
+
+      probe.expectMessage("done-unstashing") // before actually unstashing, 
see above
+
+      c ! "ping"
+      probe.expectMessage("pong")
+    }
+
     "fail when stash has reached limit if configured to fail" in {
       // persistence settings is system wide, so we need to have a custom 
testkit/actorsystem here
       val failStashTestKit = ActorTestKit(
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
index 9b8418f559..6124e83264 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
@@ -124,7 +124,7 @@ class EventSourcedBehaviorWatchSpec
       }
 
       Behaviors.setup[Command] { context =>
-        val settings = EventSourcedSettings(context.system, "", "")
+        val settings = EventSourcedSettings(context.system, "", "", None)
 
         setup(signalHandler, settings, context).onSignal("", 
RecoveryCompleted, false) shouldEqual true
         setup(PartialFunction.empty, settings, context).onSignal("", 
RecoveryCompleted, false) shouldEqual false
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala
index 28f0c5b755..f02d907114 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala
@@ -14,12 +14,15 @@
 package org.apache.pekko.persistence.typed.state.scaladsl
 
 import org.apache.pekko
+import pekko.actor.Dropped
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.testkit.typed.scaladsl.TestProbe
 import pekko.actor.typed.ActorRef
 import pekko.actor.typed.Behavior
-import pekko.actor.typed.scaladsl.Behaviors
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import pekko.actor.typed.scaladsl.adapter._
 import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
 import pekko.persistence.typed.PersistenceId
 import pekko.persistence.typed.state.RecoveryCompleted
@@ -42,38 +45,45 @@ class DurableStateRevisionSpec
     with AnyWordSpecLike
     with LogCapturing {
 
+  private def durableState(ctx: ActorContext[String], pid: PersistenceId, 
probe: ActorRef[String]) = {
+    DurableStateBehavior[String, String](
+      pid,
+      "",
+      (state, command) =>
+        state match {
+          case "stashing" =>
+            command match {
+              case "unstash" =>
+                probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} 
unstash"
+                Effect.persist("normal").thenUnstashAll()
+              case _ =>
+                Effect.stash()
+            }
+          case _ =>
+            command match {
+              case "cmd" =>
+                probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} 
onCommand"
+                Effect.persist("state").thenRun(_ => probe ! 
s"${DurableStateBehavior.lastSequenceNumber(ctx)} thenRun")
+              case "stash" =>
+                probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} 
stash"
+                Effect.persist("stashing")
+              case "snapshot" =>
+                Effect.persist("snapshot")
+            }
+        }).receiveSignal {
+      case (_, RecoveryCompleted) =>
+        probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} 
onRecoveryComplete"
+    }
+  }
+
   private def behavior(pid: PersistenceId, probe: ActorRef[String]): 
Behavior[String] =
-    Behaviors.setup(ctx =>
-      DurableStateBehavior[String, String](
-        pid,
-        "",
-        (state, command) =>
-          state match {
-            case "stashing" =>
-              command match {
-                case "unstash" =>
-                  probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} 
unstash"
-                  Effect.persist("normal").thenUnstashAll()
-                case _ =>
-                  Effect.stash()
-              }
-            case _ =>
-              command match {
-                case "cmd" =>
-                  probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} 
onCommand"
-                  Effect
-                    .persist("state")
-                    .thenRun(_ => probe ! 
s"${DurableStateBehavior.lastSequenceNumber(ctx)} thenRun")
-                case "stash" =>
-                  probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} 
stash"
-                  Effect.persist("stashing")
-                case "snapshot" =>
-                  Effect.persist("snapshot")
-              }
-          }).receiveSignal {
-        case (_, RecoveryCompleted) =>
-          probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} 
onRecoveryComplete"
-      })
+    Behaviors.setup(ctx => durableState(ctx, pid, probe))
+
+  private def behaviorWithCustomStashSize(
+      pid: PersistenceId,
+      probe: ActorRef[String],
+      customStashSize: Int): Behavior[String] =
+    Behaviors.setup(ctx => durableState(ctx, pid, 
probe).withStashCapacity(customStashSize))
 
   "The revision number" must {
 
@@ -136,5 +146,38 @@ class DurableStateRevisionSpec
       probe.expectMessage("2 onCommand") // second command
       probe.expectMessage("3 thenRun")
     }
+
+    "discard when custom stash has reached limit with default dropped setting" 
in {
+      val customLimit = 100
+      val probe = TestProbe[AnyRef]()
+      system.toClassic.eventStream.subscribe(probe.ref.toClassic, 
classOf[Dropped])
+      val durableState = 
spawn(behaviorWithCustomStashSize(PersistenceId.ofUniqueId("pid-4"), probe.ref, 
customLimit))
+      probe.expectMessage("0 onRecoveryComplete")
+
+      durableState ! "stash"
+
+      probe.expectMessage("0 stash")
+
+      LoggingTestKit.warn("Stash buffer is full, dropping message").expect {
+        (0 to customLimit).foreach { _ =>
+          durableState ! s"cmd"
+        }
+      }
+      probe.expectMessageType[Dropped]
+
+      durableState ! "unstash"
+      probe.expectMessage("1 unstash")
+
+      val lastSequenceId = 2
+      (lastSequenceId until customLimit + lastSequenceId).foreach { n =>
+        probe.expectMessage(s"$n onCommand")
+        probe.expectMessage(s"${n + 1} thenRun") // after persisting
+      }
+
+      durableState ! s"cmd"
+      probe.expectMessage(s"${102} onCommand")
+      probe.expectMessage(s"${103} thenRun")
+
+    }
   }
 }
diff --git 
a/persistence-typed/src/main/mima-filters/1.3.x.backwards.excludes/custom-stash.excludes
 
b/persistence-typed/src/main/mima-filters/1.3.x.backwards.excludes/custom-stash.excludes
new file mode 100644
index 0000000000..52208f6eea
--- /dev/null
+++ 
b/persistence-typed/src/main/mima-filters/1.3.x.backwards.excludes/custom-stash.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Support custom stash
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withStashCapacity")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.state.scaladsl.DurableStateBehavior.withStashCapacity")
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
index b9f406b5e4..ae6bb6f221 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
@@ -119,7 +119,8 @@ private[pekko] final case class 
EventSourcedBehaviorImpl[Command, Event, State](
     supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
     override val signalHandler: PartialFunction[(State, Signal), Unit] = 
PartialFunction.empty,
     replication: Option[ReplicationSetup] = None,
-    publishEvents: Boolean = true)
+    publishEvents: Boolean = true,
+    customStashCapacity: Option[Int] = None)
     extends EventSourcedBehavior[Command, Event, State] {
 
   import EventSourcedBehaviorImpl.WriterIdentity
@@ -138,7 +139,7 @@ private[pekko] final case class 
EventSourcedBehaviorImpl[Command, Event, State](
     }
     if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
     val settings = EventSourcedSettings(ctx.system, 
journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""),
-      journalPluginConfig, snapshotPluginConfig)
+      journalPluginConfig, snapshotPluginConfig, customStashCapacity)
 
     // stashState outside supervise because StashState should survive restarts 
due to persist failures
     val stashState = new 
StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
@@ -302,6 +303,9 @@ private[pekko] final case class 
EventSourcedBehaviorImpl[Command, Event, State](
     copy(publishEvents = enabled)
   }
 
+  override def withStashCapacity(size: Int): EventSourcedBehavior[Command, 
Event, State] =
+    copy(customStashCapacity = Some(size))
+
   override private[pekko] def withReplication(
       context: ReplicationContextImpl): EventSourcedBehavior[Command, Event, 
State] = {
     copy(
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
index cdc1e94bfe..ec35f4d57d 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala
@@ -27,25 +27,29 @@ import com.typesafe.config.Config
   def apply(
       system: ActorSystem[_],
       journalPluginId: String,
-      snapshotPluginId: String
+      snapshotPluginId: String,
+      customStashCapacity: Option[Int]
   ): EventSourcedSettings =
-    apply(system.settings.config, journalPluginId, snapshotPluginId, None, 
None)
+    apply(system.settings.config, journalPluginId, snapshotPluginId, None, 
None, customStashCapacity)
 
   def apply(
       system: ActorSystem[_],
       journalPluginId: String,
       snapshotPluginId: String,
       journalPluginConfig: Option[Config],
-      snapshotPluginConfig: Option[Config]
+      snapshotPluginConfig: Option[Config],
+      customStashCapacity: Option[Int]
   ): EventSourcedSettings =
-    apply(system.settings.config, journalPluginId, snapshotPluginId, 
journalPluginConfig, snapshotPluginConfig)
+    apply(system.settings.config, journalPluginId, snapshotPluginId, 
journalPluginConfig, snapshotPluginConfig,
+      customStashCapacity)
 
   def apply(
       config: Config,
       journalPluginId: String,
       snapshotPluginId: String,
       journalPluginConfig: Option[Config],
-      snapshotPluginConfig: Option[Config]
+      snapshotPluginConfig: Option[Config],
+      customStashCapacity: Option[Int]
   ): EventSourcedSettings = {
     val typedConfig = config.getConfig("pekko.persistence.typed")
 
@@ -56,7 +60,7 @@ import com.typesafe.config.Config
         throw new IllegalArgumentException(s"Unknown value for 
stash-overflow-strategy: [$unknown]")
     }
 
-    val stashCapacity = typedConfig.getInt("stash-capacity")
+    val stashCapacity = 
customStashCapacity.getOrElse(typedConfig.getInt("stash-capacity"))
     require(stashCapacity > 0, "stash-capacity MUST be > 0, unbounded 
buffering is not supported.")
 
     val logOnStashing = typedConfig.getBoolean("log-stashing")
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
index a003ae5596..415c9a2f74 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
@@ -237,10 +237,17 @@ abstract class EventSourcedBehavior[Command, Event, 
State] private[pekko] (
       if (handler.isEmpty) behavior
       else behavior.receiveSignal(handler.handler)
 
-    if (onPersistFailure.isPresent)
-      behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get)
-    else
-      behaviorWithSignalHandler
+    val withSignalHandler =
+      if (onPersistFailure.isPresent)
+        behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get)
+      else
+        behaviorWithSignalHandler
+
+    if (stashCapacity.isPresent) {
+      withSignalHandler.withStashCapacity(stashCapacity.get)
+    } else {
+      withSignalHandler
+    }
   }
 
   /**
@@ -250,6 +257,13 @@ abstract class EventSourcedBehavior[Command, Event, State] 
private[pekko] (
     scaladsl.EventSourcedBehavior.lastSequenceNumber(ctx.asScala)
   }
 
+  /**
+   * Override to define a custom stash capacity per entity.
+   * If not defined, the default `pekko.persistence.typed.stash-capacity` will 
be used.
+   * @since 1.3.0
+   */
+  def stashCapacity: Optional[java.lang.Integer] = Optional.empty()
+
 }
 
 /**
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
index 0b4ba309de..692072a12d 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
@@ -241,6 +241,13 @@ object EventSourcedBehavior {
   @ApiMayChange
   def withEventPublishing(enabled: Boolean): EventSourcedBehavior[Command, 
Event, State]
 
+  /**
+   * Define a custom stash capacity per entity.
+   * If not defined, the default `pekko.persistence.typed.stash-capacity` will 
be used.
+   * @since 1.3.0
+   */
+  def withStashCapacity(size: Int): EventSourcedBehavior[Command, Event, State]
+
   /**
    * INTERNAL API
    */
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
index 5e7a95db5c..cbdd670194 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
@@ -63,7 +63,8 @@ private[pekko] final case class 
DurableStateBehaviorImpl[Command, State](
     tag: String = "",
     snapshotAdapter: SnapshotAdapter[State] = 
NoOpSnapshotAdapter.instance[State],
     supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
-    override val signalHandler: PartialFunction[(State, Signal), Unit] = 
PartialFunction.empty)
+    override val signalHandler: PartialFunction[(State, Signal), Unit] = 
PartialFunction.empty,
+    customStashCapacity: Option[Int] = None)
     extends DurableStateBehavior[Command, State] {
 
   if (persistenceId eq null)
@@ -79,7 +80,7 @@ private[pekko] final case class 
DurableStateBehaviorImpl[Command, State](
       case _                                => false
     }
     if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
-    val settings = DurableStateSettings(ctx.system, 
durableStateStorePluginId.getOrElse(""))
+    val settings = DurableStateSettings(ctx.system, 
durableStateStorePluginId.getOrElse(""), customStashCapacity)
 
     // stashState outside supervise because StashState should survive restarts 
due to persist failures
     val stashState = new 
StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
@@ -178,6 +179,8 @@ private[pekko] final case class 
DurableStateBehaviorImpl[Command, State](
   override def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): 
DurableStateBehavior[Command, State] =
     copy(supervisionStrategy = backoffStrategy)
 
+  override def withStashCapacity(size: Int): DurableStateBehavior[Command, 
State] =
+    copy(customStashCapacity = Some(size))
 }
 
 /** Protocol used internally by the DurableStateBehavior. */
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
index df66eaeb39..442e1e0903 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
@@ -30,10 +30,16 @@ import com.typesafe.config.Config
  */
 @InternalApi private[pekko] object DurableStateSettings {
 
-  def apply(system: ActorSystem[_], durableStateStorePluginId: String): 
DurableStateSettings =
-    apply(system.settings.config, durableStateStorePluginId)
-
-  def apply(config: Config, durableStateStorePluginId: String): 
DurableStateSettings = {
+  def apply(
+      system: ActorSystem[_],
+      durableStateStorePluginId: String,
+      customStashCapacity: Option[Int]): DurableStateSettings =
+    apply(system.settings.config, durableStateStorePluginId, 
customStashCapacity)
+
+  def apply(
+      config: Config,
+      durableStateStorePluginId: String,
+      customStashCapacity: Option[Int]): DurableStateSettings = {
     val typedConfig = config.getConfig("pekko.persistence.typed")
 
     val stashOverflowStrategy = 
typedConfig.getString("stash-overflow-strategy").toLowerCase match {
@@ -43,7 +49,7 @@ import com.typesafe.config.Config
         throw new IllegalArgumentException(s"Unknown value for 
stash-overflow-strategy: [$unknown]")
     }
 
-    val stashCapacity = typedConfig.getInt("stash-capacity")
+    val stashCapacity = 
customStashCapacity.getOrElse(typedConfig.getInt("stash-capacity"))
     require(stashCapacity > 0, "stash-capacity MUST be > 0, unbounded 
buffering is not supported.")
 
     val logOnStashing = typedConfig.getBoolean("log-stashing")
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
index 86af20cf35..2de01499d7 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
@@ -170,4 +170,10 @@ object DurableStateBehavior {
    */
   def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): 
DurableStateBehavior[Command, State]
 
+  /**
+   * Define a custom stash capacity per entity.
+   * If not defined, the default `pekko.persistence.typed.stash-capacity` will 
be used.
+   * @since 1.3.0
+   */
+  def withStashCapacity(size: Int): DurableStateBehavior[Command, State]
 }
diff --git 
a/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java
 
b/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java
index c158af0de1..5e5f77d15a 100644
--- 
a/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java
+++ 
b/persistence-typed/src/test/java/jdocs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorTest.java
@@ -39,6 +39,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 public class BasicPersistentBehaviorTest {
@@ -425,6 +426,12 @@ public class BasicPersistentBehaviorTest {
         };
       }
 
+      // #custom-stash-buffer
+      public Optional<Integer> stashCapacity() {
+        return Optional.of(100);
+      }
+      // #custom-stash-buffer
+
       // #wrapPersistentBehavior
       @Override
       public boolean shouldSnapshot(State state, Event event, long sequenceNr) 
{
diff --git 
a/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
 
b/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
index 841cc1e6e3..a24ce49f74 100644
--- 
a/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
+++ 
b/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
@@ -332,6 +332,9 @@ object BasicPersistentBehaviorCompileOnly {
     commandHandler = (state, cmd) => throw new NotImplementedError("TODO: 
process the command & return an Effect"),
     eventHandler = (state, evt) => throw new NotImplementedError("TODO: 
process the event return the next state"))
     .eventAdapter(new WrapperEventAdapter[Event])
-  // #install-event-adapter
+    // #install-event-adapter
+    // #custom-stash-buffer
+    .withStashCapacity(100)
+  // #custom-stash-buffer
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to