This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 6768ffe737 ShardedDaemonProcess: throttle keep-alive messages and
limit to subset of nodes (#2734)
6768ffe737 is described below
commit 6768ffe737e4955c39ec6ebae994d72cefd0db9b
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Mar 18 10:15:18 2026 +0100
ShardedDaemonProcess: throttle keep-alive messages and limit to subset of
nodes (#2734)
* Initial plan
* Copy changes from akka/akka-core#31837: throttle ShardedDaemonProcess
keep-alive and limit pingers
Co-authored-by: pjfanning <[email protected]>
* Update ShardedDaemonProcessSpec.scala
* Update ShardedDaemonProcessSpec.scala
* Create throttle-keep-alive-messages.excludes
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../throttle-keep-alive-messages.excludes | 19 ++++++
.../src/main/resources/reference.conf | 9 ++-
.../typed/ShardedDaemonProcessSettings.scala | 34 ++++++++--
.../typed/internal/ShardedDaemonProcessImpl.scala | 74 ++++++++++++++++------
.../typed/scaladsl/ShardedDaemonProcessSpec.scala | 34 +++++++++-
5 files changed, 143 insertions(+), 27 deletions(-)
diff --git
a/cluster-sharding-typed/src/main/mima-filters/2.0.x.backwards.excludes/throttle-keep-alive-messages.excludes
b/cluster-sharding-typed/src/main/mima-filters/2.0.x.backwards.excludes/throttle-keep-alive-messages.excludes
new file mode 100644
index 0000000000..b404944ef9
--- /dev/null
+++
b/cluster-sharding-typed/src/main/mima-filters/2.0.x.backwards.excludes/throttle-keep-alive-messages.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Throttle keep-alive messages
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.sharding.typed.internal.ShardedDaemonProcessImpl$KeepAlivePinger$StartTick$")
diff --git a/cluster-sharding-typed/src/main/resources/reference.conf
b/cluster-sharding-typed/src/main/resources/reference.conf
index 738dddf489..b7c0a578fa 100644
--- a/cluster-sharding-typed/src/main/resources/reference.conf
+++ b/cluster-sharding-typed/src/main/resources/reference.conf
@@ -22,11 +22,18 @@ pekko.cluster.sharded-daemon-process {
# overriding those settings will be ignored.
sharding = ${pekko.cluster.sharding}
- # Each entity is pinged at this interval from each node in the
+ # Each entity is pinged at this interval from a few nodes in the
# cluster to trigger a start if it has stopped, for example during
# rebalancing.
+ # See also keep-alive-from-number-of-nodes and keep-alive-throttle-interval
# Note: How the set of actors is kept alive may change in the future meaning
this setting may go away.
keep-alive-interval = 10s
+
+ # Keep alive messages from this number of nodes.
+ keep-alive-from-number-of-nodes = 3
+
+ # Keep alive messages are sent with this delay between each message.
+ keep-alive-throttle-interval = 100 ms
}
pekko.cluster.configuration-compatibility-check.checkers {
diff --git
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
index 39820a5f5e..fd940fbaab 100644
---
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
+++
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
@@ -40,8 +40,11 @@ object ShardedDaemonProcessSettings {
*/
def fromConfig(config: Config): ShardedDaemonProcessSettings = {
val keepAliveInterval = config.getDuration("keep-alive-interval").toScala
+ val keepAliveFromNumberOfNodes =
config.getInt("keep-alive-from-number-of-nodes")
+ val keepAliveThrottleInterval =
config.getDuration("keep-alive-throttle-interval").toScala
- new ShardedDaemonProcessSettings(keepAliveInterval, None, None)
+ new ShardedDaemonProcessSettings(keepAliveInterval, None, None,
keepAliveFromNumberOfNodes,
+ keepAliveThrottleInterval)
}
}
@@ -52,7 +55,9 @@ object ShardedDaemonProcessSettings {
final class ShardedDaemonProcessSettings @InternalApi private[pekko] (
val keepAliveInterval: FiniteDuration,
val shardingSettings: Option[ClusterShardingSettings],
- val role: Option[String]) {
+ val role: Option[String],
+ val keepAliveFromNumberOfNodes: Int,
+ val keepAliveThrottleInterval: FiniteDuration) {
/**
* Scala API: The interval each parent of the sharded set is pinged from
each node in the cluster.
@@ -86,10 +91,31 @@ final class ShardedDaemonProcessSettings @InternalApi
private[pekko] (
def withRole(role: String): ShardedDaemonProcessSettings =
copy(role = Option(role))
+ /**
+ * Keep alive messages from this number of nodes.
+ */
+ def withKeepAliveFromNumberOfNodes(keepAliveFromNumberOfNodes: Int):
ShardedDaemonProcessSettings =
+ copy(keepAliveFromNumberOfNodes = keepAliveFromNumberOfNodes)
+
+ /**
+ * Scala API: Keep alive messages are sent with this delay between each
message.
+ */
+ def withKeepAliveThrottleInterval(keepAliveThrottleInterval:
FiniteDuration): ShardedDaemonProcessSettings =
+ copy(keepAliveThrottleInterval = keepAliveThrottleInterval)
+
+ /**
+ * Java API: Keep alive messages are sent with this delay between each
message.
+ */
+ def withKeepAliveThrottleInterval(keepAliveThrottleInterval: Duration):
ShardedDaemonProcessSettings =
+ copy(keepAliveThrottleInterval = keepAliveThrottleInterval.toScala)
+
private def copy(
keepAliveInterval: FiniteDuration = keepAliveInterval,
shardingSettings: Option[ClusterShardingSettings] = shardingSettings,
- role: Option[String] = role): ShardedDaemonProcessSettings =
- new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, role)
+ role: Option[String] = role,
+ keepAliveFromNumberOfNodes: Int = keepAliveFromNumberOfNodes,
+ keepAliveThrottleInterval: FiniteDuration = keepAliveThrottleInterval):
ShardedDaemonProcessSettings =
+ new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings,
role, keepAliveFromNumberOfNodes,
+ keepAliveThrottleInterval)
}
diff --git
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
index e0ac66530e..f8b17f56f2 100644
---
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
+++
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
@@ -17,15 +17,19 @@ import java.util.Optional
import java.util.function.IntFunction
import scala.jdk.OptionConverters._
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import org.apache.pekko
+import pekko.Done
import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.actor.typed.scaladsl.LoggerOps
import pekko.annotation.InternalApi
+import pekko.cluster.MemberStatus
import pekko.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import pekko.cluster.sharding.ShardRegion.EntityId
import pekko.cluster.sharding.typed.ClusterShardingSettings
@@ -42,7 +46,7 @@ import pekko.cluster.sharding.typed.scaladsl.StartEntity
import pekko.cluster.typed.Cluster
import pekko.cluster.typed.SelfUp
import pekko.cluster.typed.Subscribe
-import pekko.util.PrettyDuration
+import pekko.stream.scaladsl.Source
/**
* INTERNAL API
@@ -53,37 +57,67 @@ private[pekko] object ShardedDaemonProcessImpl {
object KeepAlivePinger {
sealed trait Event
private case object Tick extends Event
- private case object StartTick extends Event
+ private case object SendKeepAliveDone extends Event
def apply[T](
settings: ShardedDaemonProcessSettings,
name: String,
identities: Set[EntityId],
- shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] =
- Behaviors.setup { context =>
- Cluster(context.system).subscriptions ! Subscribe(
- context.messageAdapter[SelfUp](_ => StartTick),
- classOf[SelfUp])
- Behaviors.withTimers { timers =>
- def triggerStartAll(): Unit = {
- identities.foreach(id => shardingRef ! StartEntity(id))
+ shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] = {
+ val sortedIdentities = identities.toVector.sorted
+
+ def sendKeepAliveMessages()(implicit sys: ActorSystem[_]): Future[Done]
= {
+ if (settings.keepAliveThrottleInterval == Duration.Zero) {
+ sortedIdentities.foreach(id => shardingRef ! StartEntity(id))
+ Future.successful(Done)
+ } else {
+ Source(sortedIdentities).throttle(1,
settings.keepAliveThrottleInterval).runForeach { id =>
+ shardingRef ! StartEntity(id)
+ }
+ }
+ }
+
+ Behaviors.setup[Event] { context =>
+ implicit val system: ActorSystem[_] = context.system
+ val cluster = Cluster(system)
+
+ if (cluster.selfMember.status == MemberStatus.Up)
+ context.self ! Tick
+ else
+ cluster.subscriptions ! Subscribe(context.messageAdapter[SelfUp](_
=> Tick), classOf[SelfUp])
+
+ def isActive(): Boolean = {
+ val members = settings.role match {
+ case None => cluster.state.members
+ case Some(role) =>
cluster.state.members.filter(_.roles.contains(role))
}
+ // members are sorted so this is deterministic (the same) on all
nodes
+
members.take(settings.keepAliveFromNumberOfNodes).contains(cluster.selfMember)
+ }
+
+ Behaviors.withTimers { timers =>
Behaviors.receiveMessage {
- case StartTick =>
- triggerStartAll()
- context.log.debug2(
- s"Starting Sharded Daemon Process KeepAlivePinger for [{}],
with ping interval [{}]",
- name,
- PrettyDuration.format(settings.keepAliveInterval))
- timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval)
- Behaviors.same
case Tick =>
- triggerStartAll()
- context.log.debug("Periodic ping sent to [{}] processes",
identities.size)
+ if (isActive()) {
+ context.log.debug2(
+ s"Sending periodic keep alive for Sharded Daemon Process
[{}] to [{}] processes.",
+ name,
+ sortedIdentities.size)
+ context.pipeToSelf(sendKeepAliveMessages()) { _ =>
+ SendKeepAliveDone
+ }
+ } else {
+ timers.startSingleTimer(Tick, settings.keepAliveInterval)
+ }
+ Behaviors.same
+ case SendKeepAliveDone =>
+ timers.startSingleTimer(Tick, settings.keepAliveInterval)
Behaviors.same
}
}
}
+ }
+
}
final class MessageExtractor[T] extends
ShardingMessageExtractor[ShardingEnvelope[T], T] {
diff --git
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala
index cf58630c75..8251755462 100644
---
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala
+++
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala
@@ -23,6 +23,7 @@ import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.cluster.MemberStatus
import pekko.cluster.sharding.typed.ShardedDaemonProcessSettings
+import
pekko.cluster.sharding.typed.internal.ShardedDaemonProcessImpl.KeepAlivePinger
import pekko.cluster.typed.Cluster
import pekko.cluster.typed.Join
@@ -43,10 +44,12 @@ object ShardedDaemonProcessSpec {
# ping often/start fast for test
pekko.cluster.sharded-daemon-process.keep-alive-interval = 1s
+ pekko.cluster.sharded-daemon-process.keep-alive-throttle-interval = 20ms
pekko.coordinated-shutdown.terminate-actor-system = off
pekko.coordinated-shutdown.run-by-actor-system-terminate = off
- """)
+ """
+ )
object MyActor {
sealed trait Command
@@ -74,7 +77,7 @@ class ShardedDaemonProcessSpec
import ShardedDaemonProcessSpec._
- "The ShardedDaemonSet" must {
+ "The ShardedDaemonProcess" must {
"have a single node cluster running first" in {
val probe = createTestProbe()
@@ -114,6 +117,33 @@ class ShardedDaemonProcessSpec
}
+ "KeepAlivePinger" must {
+ "have a single node cluster running first" in {
+ val probe = createTestProbe()
+ Cluster(system).manager ! Join(Cluster(system).selfMember.address)
+ probe.awaitAssert({
+ Cluster(system).selfMember.status == MemberStatus.Up
+ }, 3.seconds)
+ }
+
+ "throttle keep alive messages" in {
+ val shardingProbe = createTestProbe[Any]()
+ val settings =
ShardedDaemonProcessSettings(system).withKeepAliveThrottleInterval(1.second)
+ val pinger = spawn(KeepAlivePinger(settings, "throttle", Set("1", "2",
"3"), shardingProbe.ref))
+ // note that StartEntity.apply is actually a ShardingEnvelope wrapping
the StartEntity message
+ shardingProbe.expectMessage(StartEntity("1"))
+ shardingProbe.expectNoMessage(100.millis)
+ shardingProbe.expectMessage(StartEntity("2"))
+ shardingProbe.expectNoMessage(100.millis)
+ shardingProbe.expectMessage(StartEntity("3"))
+ shardingProbe.expectNoMessage(100.millis)
+ shardingProbe.expectMessage(StartEntity("1"))
+
+ testKit.stop(pinger)
+ }
+
+ }
+
object TagProcessor {
sealed trait Command
def apply(tag: String): Behavior[Command] = Behaviors.setup { ctx =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]