This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 195cba7187 RememberEntity throttling per region instead of per shard
(#2786)
195cba7187 is described below
commit 195cba71876eca63f93b042ccb5cb01a37ca5c28
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 27 19:30:36 2026 +0100
RememberEntity throttling per region instead of per shard (#2786)
* Initial plan
* Port akka-core PR #31836: RememberEntityStarter throttling per region
instead of per shard
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/ef8aaa16-c225-425c-b177-45f57c22b143
* scalafmt
* scalafmt
* Update reference.conf
* Port akka-core#31836: fix three structural gaps in RememberEntity
per-region throttling (#27)
* Initial plan
* Fix three gaps vs akka-core v2.8.0: Shard constructor, ShardRegion
manager, and test improvement
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/47b8e4cd-f03b-409d-9a54-d0e49bed96ae
* Update ShardRegion.scala
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
cluster-sharding/src/main/resources/reference.conf | 3 +-
.../org/apache/pekko/cluster/sharding/Shard.scala | 16 +--
.../pekko/cluster/sharding/ShardRegion.scala | 12 +-
.../sharding/internal/RememberEntityStarter.scala | 95 +++++++++++++---
.../internal/RememberEntitiesStarterSpec.scala | 124 ++++++++++++++++++++-
5 files changed, 221 insertions(+), 29 deletions(-)
diff --git a/cluster-sharding/src/main/resources/reference.conf
b/cluster-sharding/src/main/resources/reference.conf
index b21f7f2fe6..d079825d10 100644
--- a/cluster-sharding/src/main/resources/reference.conf
+++ b/cluster-sharding/src/main/resources/reference.conf
@@ -343,9 +343,10 @@ pekko.cluster.sharding {
# entity actors at a fix rate. The default strategy "all".
entity-recovery-strategy = "all"
- # Default settings for the constant rate entity recovery strategy
+ # Default settings for the constant rate entity recovery strategy.
entity-recovery-constant-rate-strategy {
# Sets the frequency at which a batch of entity actors is started.
+ # The frequency is per sharding region (entity type).
frequency = 100 ms
# Sets the number of entity actors to be restart at a particular interval
number-of-entities = 5
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
index 7c018326b9..2d4b1165db 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
@@ -17,7 +17,6 @@ import java.net.URLEncoder
import java.util
import scala.annotation.nowarn
-import scala.collection.immutable.Set
import scala.concurrent.duration._
import org.apache.pekko
@@ -43,7 +42,7 @@ import
pekko.cluster.sharding.internal.EntityPassivationStrategy
import pekko.cluster.sharding.internal.RememberEntitiesProvider
import pekko.cluster.sharding.internal.RememberEntitiesShardStore
import pekko.cluster.sharding.internal.RememberEntitiesShardStore.GetEntities
-import pekko.cluster.sharding.internal.RememberEntityStarter
+import pekko.cluster.sharding.internal.RememberEntityStarterManager
import pekko.coordination.lease.scaladsl.Lease
import pekko.coordination.lease.scaladsl.LeaseProvider
import pekko.event.LoggingAdapter
@@ -106,7 +105,8 @@ private[pekko] object Shard {
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any,
- rememberEntitiesProvider: Option[RememberEntitiesProvider]): Props =
+ rememberEntitiesProvider: Option[RememberEntitiesProvider],
+ rememberEntityStarterManager: ActorRef): Props =
Props(
new Shard(
typeName,
@@ -116,7 +116,8 @@ private[pekko] object Shard {
extractEntityId,
extractShardId,
handOffStopMessage,
- rememberEntitiesProvider)).withDeploy(Deploy.local)
+ rememberEntitiesProvider,
+ rememberEntityStarterManager)).withDeploy(Deploy.local)
case object PassivateIntervalTick extends NoSerializationVerificationNeeded
@@ -428,7 +429,8 @@ private[pekko] class Shard(
extractEntityId: ShardRegion.ExtractEntityId,
@nowarn("msg=never used") extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any,
- rememberEntitiesProvider: Option[RememberEntitiesProvider])
+ rememberEntitiesProvider: Option[RememberEntitiesProvider],
+ rememberEntityStarterManager: ActorRef)
extends Actor
with ActorLogging
with Stash
@@ -604,9 +606,7 @@ private[pekko] class Shard(
if (ids.nonEmpty) {
entities.alreadyRemembered(ids)
log.debug("{}: Restarting set of [{}] entities", typeName, ids.size)
- context.actorOf(
- RememberEntityStarter.props(context.parent, self, shardId, ids,
settings),
- "RememberEntitiesStarter")
+ rememberEntityStarterManager !
RememberEntityStarterManager.StartEntities(self, shardId, ids)
}
shardInitialized()
}
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
index b3a5cce4de..a6aca98d88 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
@@ -37,6 +37,7 @@ import pekko.cluster.MemberStatus
import pekko.cluster.sharding.ClusterShardingSettings.PassivationStrategy
import pekko.cluster.sharding.Shard.ShardStats
import pekko.cluster.sharding.internal.RememberEntitiesProvider
+import pekko.cluster.sharding.internal.RememberEntityStarterManager
import pekko.event.Logging
import pekko.pattern.ask
import pekko.pattern.pipe
@@ -673,6 +674,14 @@ private[pekko] class ShardRegion(
}
}
+ // When rememberEntities is enabled, create a manager to throttle entity
starting across
+ // all shards in this region (per entity type) rather than per shard
+ private val rememberEntityStarterManager: ActorRef =
+ if (rememberEntitiesProvider.isDefined)
+ context.actorOf(RememberEntityStarterManager.props(context.self,
settings), "RememberEntityStarter")
+ else
+ context.system.deadLetters
+
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
@@ -1349,7 +1358,8 @@ private[pekko] class ShardRegion(
extractEntityId,
extractShardId,
handOffStopMessage,
- rememberEntitiesProvider)
+ rememberEntitiesProvider,
+ rememberEntityStarterManager)
.withDispatcher(context.props.dispatcher),
name))
shardsByRef = shardsByRef.updated(shard, id)
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/RememberEntityStarter.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/RememberEntityStarter.scala
index 984b82f970..88d4a79729 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/RememberEntityStarter.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/RememberEntityStarter.scala
@@ -22,6 +22,7 @@ import pekko.actor.ActorLogging
import pekko.actor.ActorRef
import pekko.actor.NoSerializationVerificationNeeded
import pekko.actor.Props
+import pekko.actor.Terminated
import pekko.actor.Timers
import pekko.annotation.InternalApi
import pekko.cluster.sharding.ClusterShardingSettings
@@ -30,6 +31,70 @@ import pekko.cluster.sharding.ShardRegion
import pekko.cluster.sharding.ShardRegion.EntityId
import pekko.cluster.sharding.ShardRegion.ShardId
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] object RememberEntityStarterManager {
+ def props(region: ActorRef, settings: ClusterShardingSettings) =
+ Props(new RememberEntityStarterManager(region, settings))
+
+ final case class StartEntities(shard: ActorRef, shardId:
ShardRegion.ShardId, ids: Set[ShardRegion.EntityId])
+ extends NoSerializationVerificationNeeded
+
+ private case object ContinueAfterDelay extends
NoSerializationVerificationNeeded
+}
+
+/**
+ * INTERNAL API: Per-region throttler for starting remembered entities,
ensuring the
+ * constant-rate strategy throttles across all shards in a region rather than
per shard.
+ */
+@InternalApi
+private[pekko] final class RememberEntityStarterManager(region: ActorRef,
settings: ClusterShardingSettings)
+ extends Actor
+ with ActorLogging
+ with Timers {
+ import RememberEntityStarterManager._
+
+ private val delay =
settings.tuningParameters.entityRecoveryConstantRateStrategyFrequency
+
+ override def receive: Receive =
settings.tuningParameters.entityRecoveryStrategy match {
+ case "all" => allStrategy
+ case "constant" => constantStrategyIdle
+ case other => throw new IllegalArgumentException(s"Unknown
entityRecoveryStrategy [$other]")
+ }
+
+ private val allStrategy: Receive = {
+ case s: StartEntities => start(s, isConstantStrategy = false)
+ case _: Terminated => // RememberEntityStarter was done
+ }
+
+ private val constantStrategyIdle: Receive = {
+ case s: StartEntities =>
+ start(s, isConstantStrategy = true)
+ context.become(constantStrategyWaiting(Vector.empty))
+ }
+
+ private def constantStrategyWaiting(workQueue: Vector[StartEntities]):
Receive = {
+ case s: StartEntities => context.become(constantStrategyWaiting(workQueue
:+ s))
+
+ case _: Terminated => // RememberEntityStarter was done
+ timers.startSingleTimer(ContinueAfterDelay, ContinueAfterDelay, delay)
+
+ case ContinueAfterDelay =>
+ if (workQueue.isEmpty) context.become(constantStrategyIdle)
+ else {
+ start(workQueue.head, isConstantStrategy = true)
+ context.become(constantStrategyWaiting(workQueue.tail))
+ }
+ }
+
+ private def start(s: StartEntities, isConstantStrategy: Boolean): Unit = {
+ context.watch(
+ context.actorOf(RememberEntityStarter.props(region, s.shard, s.shardId,
s.ids, isConstantStrategy, settings)))
+ }
+}
+
/**
* INTERNAL API
*/
@@ -40,8 +105,9 @@ private[pekko] object RememberEntityStarter {
shard: ActorRef,
shardId: ShardRegion.ShardId,
ids: Set[ShardRegion.EntityId],
+ isConstantStrategy: Boolean,
settings: ClusterShardingSettings) =
- Props(new RememberEntityStarter(region, shard, shardId, ids, settings))
+ Props(new RememberEntityStarter(region, shard, shardId, ids,
isConstantStrategy, settings))
private final case class StartBatch(batchSize: Int) extends
NoSerializationVerificationNeeded
private case object ResendUnAcked extends NoSerializationVerificationNeeded
@@ -56,6 +122,7 @@ private[pekko] final class RememberEntityStarter(
shard: ActorRef,
shardId: ShardRegion.ShardId,
ids: Set[ShardRegion.EntityId],
+ constantStrategy: Boolean,
settings: ClusterShardingSettings)
extends Actor
with ActorLogging
@@ -71,22 +138,22 @@ private[pekko] final class RememberEntityStarter(
private var entitiesMoved = Set.empty[EntityId]
log.debug(
- "Shard starting [{}] remembered entities using strategy [{}]",
+ "Shard [{}] starting [{}] remembered entities using strategy [{}]",
+ shardId,
ids.size,
settings.tuningParameters.entityRecoveryStrategy)
- settings.tuningParameters.entityRecoveryStrategy match {
- case "all" =>
- idsLeftToStart = Set.empty
- startBatch(ids)
- case "constant" =>
- import settings.tuningParameters
- idsLeftToStart = ids
- timers.startTimerWithFixedDelay(
- "constant",
-
StartBatch(tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities),
- tuningParameters.entityRecoveryConstantRateStrategyFrequency)
-
startBatch(tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities)
+ if (constantStrategy) {
+ import settings.tuningParameters
+ idsLeftToStart = ids
+ timers.startTimerWithFixedDelay(
+ "constant",
+
StartBatch(tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities),
+ tuningParameters.entityRecoveryConstantRateStrategyFrequency)
+
startBatch(tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities)
+ } else {
+ idsLeftToStart = Set.empty
+ startBatch(ids)
}
timers.startTimerWithFixedDelay("retry", ResendUnAcked,
settings.tuningParameters.retryInterval)
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/internal/RememberEntitiesStarterSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/internal/RememberEntitiesStarterSpec.scala
index 18921dcfb4..99b9ab40d6 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/internal/RememberEntitiesStarterSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/internal/RememberEntitiesStarterSpec.scala
@@ -43,7 +43,13 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
val defaultSettings = ClusterShardingSettings(system)
val rememberEntityStarter = system.actorOf(
- RememberEntityStarter.props(regionProbe.ref, shardProbe.ref, shardId,
Set("1", "2", "3"), defaultSettings))
+ RememberEntityStarter.props(
+ regionProbe.ref,
+ shardProbe.ref,
+ shardId,
+ Set("1", "2", "3"),
+ isConstantStrategy = false,
+ defaultSettings))
watch(rememberEntityStarter)
val startedEntityIds = (1 to 3).map { _ =>
@@ -74,7 +80,13 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
.withFallback(system.settings.config.getConfig("pekko.cluster.sharding")))
val rememberEntityStarter = system.actorOf(
- RememberEntityStarter.props(regionProbe.ref, shardProbe.ref, shardId,
Set("1", "2", "3"), customSettings))
+ RememberEntityStarter.props(
+ regionProbe.ref,
+ shardProbe.ref,
+ shardId,
+ Set("1", "2", "3"),
+ isConstantStrategy = false,
+ customSettings))
watch(rememberEntityStarter)
(1 to 3).foreach { _ =>
@@ -107,7 +119,13 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
.withFallback(system.settings.config.getConfig("pekko.cluster.sharding")))
val rememberEntityStarter = system.actorOf(
- RememberEntityStarter.props(regionProbe.ref, shardProbe.ref, shardId,
Set("1", "2", "3"), customSettings))
+ RememberEntityStarter.props(
+ regionProbe.ref,
+ shardProbe.ref,
+ shardId,
+ Set("1", "2", "3"),
+ isConstantStrategy = false,
+ customSettings))
watch(rememberEntityStarter)
val start1 = regionProbe.expectMsgType[ShardRegion.StartEntity]
@@ -143,8 +161,13 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
.withFallback(system.settings.config.getConfig("pekko.cluster.sharding")))
val rememberEntityStarter = system.actorOf(
- RememberEntityStarter
- .props(regionProbe.ref, shardProbe.ref, shardId, Set("1", "2", "3",
"4", "5"), customSettings))
+ RememberEntityStarter.props(
+ regionProbe.ref,
+ shardProbe.ref,
+ shardId,
+ Set("1", "2", "3", "4", "5"),
+ isConstantStrategy = true,
+ customSettings))
def recieveStartAndAck() = {
val start = regionProbe.expectMsgType[ShardRegion.StartEntity]
@@ -173,4 +196,95 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
}
}
+
+ "The RememberEntityStarterManager" must {
+ "start entities for all shards immediately with entity-recovery-strategy =
all (default)" in {
+ val regionProbe = TestProbe()
+ val shardProbe1 = TestProbe()
+ val shardProbe2 = TestProbe()
+ val shardId1 = nextShardId()
+ val shardId2 = nextShardId()
+
+ val defaultSettings = ClusterShardingSettings(system)
+
+ val manager =
system.actorOf(RememberEntityStarterManager.props(regionProbe.ref,
defaultSettings))
+
+ manager ! RememberEntityStarterManager.StartEntities(shardProbe1.ref,
shardId1, Set("1", "2"))
+ manager ! RememberEntityStarterManager.StartEntities(shardProbe2.ref,
shardId2, Set("3", "4"))
+
+ // both shards should be started immediately (all strategy, no queuing)
+ val startedEntityIds = (1 to 4).map { _ =>
+ val start = regionProbe.expectMsgType[ShardRegion.StartEntity]
+ regionProbe.lastSender ! ShardRegion.StartEntityAck(start.entityId,
+ start.entityId match {
+ case "1" | "2" => shardId1
+ case _ => shardId2
+ })
+ start.entityId
+ }.toSet
+ startedEntityIds should ===(Set("1", "2", "3", "4"))
+ }
+
+ "throttle entity starting across shards with entity-recovery-strategy =
constant" in {
+ val regionProbe = TestProbe()
+ val shard1Probe = TestProbe()
+ val shard2Probe = TestProbe()
+ val shardId1 = nextShardId()
+ val shardId2 = nextShardId()
+
+ val customSettings = ClusterShardingSettings(
+ ConfigFactory
+ .parseString(
+ """
+ entity-recovery-strategy = constant
+ entity-recovery-constant-rate-strategy {
+ frequency = 2 s
+ number-of-entities = 2
+ }
+ retry-interval = 1 second
+ """)
+
.withFallback(system.settings.config.getConfig("pekko.cluster.sharding")))
+
+ val manager =
system.actorOf(RememberEntityStarterManager.props(regionProbe.ref,
customSettings))
+
+ manager ! RememberEntityStarterManager.StartEntities(shard1Probe.ref,
shardId1, Set("1", "2", "3", "4", "5"))
+ manager ! RememberEntityStarterManager.StartEntities(shard2Probe.ref,
shardId2, Set("6", "7", "8"))
+
+ import pekko.cluster.sharding.ShardRegion.EntityId
+
+ def receiveStartAndAck(): EntityId = {
+ val start = regionProbe.expectMsgType[ShardRegion.StartEntity]
+ val shardId = if (start.entityId.toInt <= 5) shardId1 else shardId2
+ regionProbe.lastSender ! ShardRegion.StartEntityAck(start.entityId,
shardId)
+ start.entityId
+ }
+
+ var startedEntityIds = Set.empty[EntityId]
+
+ // first batch for shard1 should be immediate
+ startedEntityIds += receiveStartAndAck()
+ startedEntityIds += receiveStartAndAck()
+
+ // second batch holding off
+ regionProbe.expectNoMessage(600.millis)
+ startedEntityIds += receiveStartAndAck()
+ startedEntityIds += receiveStartAndAck()
+
+ // third batch holding off
+ regionProbe.expectNoMessage(600.millis)
+ startedEntityIds += receiveStartAndAck()
+
+ startedEntityIds should ===(Set("1", "2", "3", "4", "5"))
+
+ // now the second StartEntities for shard2 — still throttled after delay
+ regionProbe.expectNoMessage(600.millis)
+ startedEntityIds += receiveStartAndAck()
+ startedEntityIds += receiveStartAndAck()
+
+ regionProbe.expectNoMessage(600.millis)
+ startedEntityIds += receiveStartAndAck()
+
+ startedEntityIds should ===(Set("1", "2", "3", "4", "5", "6", "7", "8"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]