This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 6768ffe737 ShardedDaemonProcess: throttle keep-alive messages and 
limit to subset of nodes (#2734)
6768ffe737 is described below

commit 6768ffe737e4955c39ec6ebae994d72cefd0db9b
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Mar 18 10:15:18 2026 +0100

    ShardedDaemonProcess: throttle keep-alive messages and limit to subset of 
nodes (#2734)
    
    * Initial plan
    
    * Copy changes from akka/akka-core#31837: throttle ShardedDaemonProcess 
keep-alive and limit pingers
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update ShardedDaemonProcessSpec.scala
    
    * Update ShardedDaemonProcessSpec.scala
    
    * Create throttle-keep-alive-messages.excludes
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../throttle-keep-alive-messages.excludes          | 19 ++++++
 .../src/main/resources/reference.conf              |  9 ++-
 .../typed/ShardedDaemonProcessSettings.scala       | 34 ++++++++--
 .../typed/internal/ShardedDaemonProcessImpl.scala  | 74 ++++++++++++++++------
 .../typed/scaladsl/ShardedDaemonProcessSpec.scala  | 34 +++++++++-
 5 files changed, 143 insertions(+), 27 deletions(-)

diff --git 
a/cluster-sharding-typed/src/main/mima-filters/2.0.x.backwards.excludes/throttle-keep-alive-messages.excludes
 
b/cluster-sharding-typed/src/main/mima-filters/2.0.x.backwards.excludes/throttle-keep-alive-messages.excludes
new file mode 100644
index 0000000000..b404944ef9
--- /dev/null
+++ 
b/cluster-sharding-typed/src/main/mima-filters/2.0.x.backwards.excludes/throttle-keep-alive-messages.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Throttle keep-alive messages
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.sharding.typed.internal.ShardedDaemonProcessImpl$KeepAlivePinger$StartTick$")
diff --git a/cluster-sharding-typed/src/main/resources/reference.conf 
b/cluster-sharding-typed/src/main/resources/reference.conf
index 738dddf489..b7c0a578fa 100644
--- a/cluster-sharding-typed/src/main/resources/reference.conf
+++ b/cluster-sharding-typed/src/main/resources/reference.conf
@@ -22,11 +22,18 @@ pekko.cluster.sharded-daemon-process {
   # overriding those settings will be ignored.
   sharding = ${pekko.cluster.sharding}
 
-  # Each entity is pinged at this interval from each node in the
+  # Each entity is pinged at this interval from a few nodes in the
   # cluster to trigger a start if it has stopped, for example during
   # rebalancing.
+  # See also keep-alive-from-number-of-nodes and keep-alive-throttle-interval
   # Note: How the set of actors is kept alive may change in the future meaning 
this setting may go away.
   keep-alive-interval = 10s
+
+  # Keep alive messages from this number of nodes.
+  keep-alive-from-number-of-nodes = 3
+
+  # Keep alive messages are sent with this delay between each message.
+  keep-alive-throttle-interval = 100 ms
 }
 
 pekko.cluster.configuration-compatibility-check.checkers {
diff --git 
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
 
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
index 39820a5f5e..fd940fbaab 100644
--- 
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
+++ 
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
@@ -40,8 +40,11 @@ object ShardedDaemonProcessSettings {
    */
   def fromConfig(config: Config): ShardedDaemonProcessSettings = {
     val keepAliveInterval = config.getDuration("keep-alive-interval").toScala
+    val keepAliveFromNumberOfNodes = 
config.getInt("keep-alive-from-number-of-nodes")
+    val keepAliveThrottleInterval = 
config.getDuration("keep-alive-throttle-interval").toScala
 
-    new ShardedDaemonProcessSettings(keepAliveInterval, None, None)
+    new ShardedDaemonProcessSettings(keepAliveInterval, None, None, 
keepAliveFromNumberOfNodes,
+      keepAliveThrottleInterval)
   }
 
 }
@@ -52,7 +55,9 @@ object ShardedDaemonProcessSettings {
 final class ShardedDaemonProcessSettings @InternalApi private[pekko] (
     val keepAliveInterval: FiniteDuration,
     val shardingSettings: Option[ClusterShardingSettings],
-    val role: Option[String]) {
+    val role: Option[String],
+    val keepAliveFromNumberOfNodes: Int,
+    val keepAliveThrottleInterval: FiniteDuration) {
 
   /**
    * Scala API: The interval each parent of the sharded set is pinged from 
each node in the cluster.
@@ -86,10 +91,31 @@ final class ShardedDaemonProcessSettings @InternalApi 
private[pekko] (
   def withRole(role: String): ShardedDaemonProcessSettings =
     copy(role = Option(role))
 
+  /**
+   * Keep alive messages from this number of nodes.
+   */
+  def withKeepAliveFromNumberOfNodes(keepAliveFromNumberOfNodes: Int): 
ShardedDaemonProcessSettings =
+    copy(keepAliveFromNumberOfNodes = keepAliveFromNumberOfNodes)
+
+  /**
+   * Scala API: Keep alive messages are sent with this delay between each 
message.
+   */
+  def withKeepAliveThrottleInterval(keepAliveThrottleInterval: 
FiniteDuration): ShardedDaemonProcessSettings =
+    copy(keepAliveThrottleInterval = keepAliveThrottleInterval)
+
+  /**
+   * Java API: Keep alive messages are sent with this delay between each 
message.
+   */
+  def withKeepAliveThrottleInterval(keepAliveThrottleInterval: Duration): 
ShardedDaemonProcessSettings =
+    copy(keepAliveThrottleInterval = keepAliveThrottleInterval.toScala)
+
   private def copy(
       keepAliveInterval: FiniteDuration = keepAliveInterval,
       shardingSettings: Option[ClusterShardingSettings] = shardingSettings,
-      role: Option[String] = role): ShardedDaemonProcessSettings =
-    new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, role)
+      role: Option[String] = role,
+      keepAliveFromNumberOfNodes: Int = keepAliveFromNumberOfNodes,
+      keepAliveThrottleInterval: FiniteDuration = keepAliveThrottleInterval): 
ShardedDaemonProcessSettings =
+    new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, 
role, keepAliveFromNumberOfNodes,
+      keepAliveThrottleInterval)
 
 }
diff --git 
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
 
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
index e0ac66530e..f8b17f56f2 100644
--- 
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
+++ 
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
@@ -17,15 +17,19 @@ import java.util.Optional
 import java.util.function.IntFunction
 
 import scala.jdk.OptionConverters._
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
 import scala.reflect.ClassTag
 
 import org.apache.pekko
+import pekko.Done
 import pekko.actor.typed.ActorRef
 import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.Behavior
 import pekko.actor.typed.scaladsl.Behaviors
 import pekko.actor.typed.scaladsl.LoggerOps
 import pekko.annotation.InternalApi
+import pekko.cluster.MemberStatus
 import pekko.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
 import pekko.cluster.sharding.ShardRegion.EntityId
 import pekko.cluster.sharding.typed.ClusterShardingSettings
@@ -42,7 +46,7 @@ import pekko.cluster.sharding.typed.scaladsl.StartEntity
 import pekko.cluster.typed.Cluster
 import pekko.cluster.typed.SelfUp
 import pekko.cluster.typed.Subscribe
-import pekko.util.PrettyDuration
+import pekko.stream.scaladsl.Source
 
 /**
  * INTERNAL API
@@ -53,37 +57,67 @@ private[pekko] object ShardedDaemonProcessImpl {
   object KeepAlivePinger {
     sealed trait Event
     private case object Tick extends Event
-    private case object StartTick extends Event
+    private case object SendKeepAliveDone extends Event
 
     def apply[T](
         settings: ShardedDaemonProcessSettings,
         name: String,
         identities: Set[EntityId],
-        shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] =
-      Behaviors.setup { context =>
-        Cluster(context.system).subscriptions ! Subscribe(
-          context.messageAdapter[SelfUp](_ => StartTick),
-          classOf[SelfUp])
-        Behaviors.withTimers { timers =>
-          def triggerStartAll(): Unit = {
-            identities.foreach(id => shardingRef ! StartEntity(id))
+        shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] = {
+      val sortedIdentities = identities.toVector.sorted
+
+      def sendKeepAliveMessages()(implicit sys: ActorSystem[_]): Future[Done] 
= {
+        if (settings.keepAliveThrottleInterval == Duration.Zero) {
+          sortedIdentities.foreach(id => shardingRef ! StartEntity(id))
+          Future.successful(Done)
+        } else {
+          Source(sortedIdentities).throttle(1, 
settings.keepAliveThrottleInterval).runForeach { id =>
+            shardingRef ! StartEntity(id)
+          }
+        }
+      }
+
+      Behaviors.setup[Event] { context =>
+        implicit val system: ActorSystem[_] = context.system
+        val cluster = Cluster(system)
+
+        if (cluster.selfMember.status == MemberStatus.Up)
+          context.self ! Tick
+        else
+          cluster.subscriptions ! Subscribe(context.messageAdapter[SelfUp](_ 
=> Tick), classOf[SelfUp])
+
+        def isActive(): Boolean = {
+          val members = settings.role match {
+            case None       => cluster.state.members
+            case Some(role) => 
cluster.state.members.filter(_.roles.contains(role))
           }
+          // members are sorted so this is deterministic (the same) on all 
nodes
+          
members.take(settings.keepAliveFromNumberOfNodes).contains(cluster.selfMember)
+        }
+
+        Behaviors.withTimers { timers =>
           Behaviors.receiveMessage {
-            case StartTick =>
-              triggerStartAll()
-              context.log.debug2(
-                s"Starting Sharded Daemon Process KeepAlivePinger for [{}], 
with ping interval [{}]",
-                name,
-                PrettyDuration.format(settings.keepAliveInterval))
-              timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval)
-              Behaviors.same
             case Tick =>
-              triggerStartAll()
-              context.log.debug("Periodic ping sent to [{}] processes", 
identities.size)
+              if (isActive()) {
+                context.log.debug2(
+                  s"Sending periodic keep alive for Sharded Daemon Process 
[{}] to [{}] processes.",
+                  name,
+                  sortedIdentities.size)
+                context.pipeToSelf(sendKeepAliveMessages()) { _ =>
+                  SendKeepAliveDone
+                }
+              } else {
+                timers.startSingleTimer(Tick, settings.keepAliveInterval)
+              }
+              Behaviors.same
+            case SendKeepAliveDone =>
+              timers.startSingleTimer(Tick, settings.keepAliveInterval)
               Behaviors.same
           }
         }
       }
+    }
+
   }
 
   final class MessageExtractor[T] extends 
ShardingMessageExtractor[ShardingEnvelope[T], T] {
diff --git 
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala
 
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala
index cf58630c75..8251755462 100644
--- 
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala
+++ 
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala
@@ -23,6 +23,7 @@ import pekko.actor.typed.Behavior
 import pekko.actor.typed.scaladsl.Behaviors
 import pekko.cluster.MemberStatus
 import pekko.cluster.sharding.typed.ShardedDaemonProcessSettings
+import 
pekko.cluster.sharding.typed.internal.ShardedDaemonProcessImpl.KeepAlivePinger
 import pekko.cluster.typed.Cluster
 import pekko.cluster.typed.Join
 
@@ -43,10 +44,12 @@ object ShardedDaemonProcessSpec {
       
       # ping often/start fast for test
       pekko.cluster.sharded-daemon-process.keep-alive-interval = 1s
+      pekko.cluster.sharded-daemon-process.keep-alive-throttle-interval = 20ms
 
       pekko.coordinated-shutdown.terminate-actor-system = off
       pekko.coordinated-shutdown.run-by-actor-system-terminate = off
-      """)
+      """
+  )
 
   object MyActor {
     sealed trait Command
@@ -74,7 +77,7 @@ class ShardedDaemonProcessSpec
 
   import ShardedDaemonProcessSpec._
 
-  "The ShardedDaemonSet" must {
+  "The ShardedDaemonProcess" must {
 
     "have a single node cluster running first" in {
       val probe = createTestProbe()
@@ -114,6 +117,33 @@ class ShardedDaemonProcessSpec
 
   }
 
+  "KeepAlivePinger" must {
+    "have a single node cluster running first" in {
+      val probe = createTestProbe()
+      Cluster(system).manager ! Join(Cluster(system).selfMember.address)
+      probe.awaitAssert({
+          Cluster(system).selfMember.status == MemberStatus.Up
+        }, 3.seconds)
+    }
+
+    "throttle keep alive messages" in {
+      val shardingProbe = createTestProbe[Any]()
+      val settings = 
ShardedDaemonProcessSettings(system).withKeepAliveThrottleInterval(1.second)
+      val pinger = spawn(KeepAlivePinger(settings, "throttle", Set("1", "2", 
"3"), shardingProbe.ref))
+      // note that StartEntity.apply is actually a ShardingEnvelope wrapping 
the StartEntity message
+      shardingProbe.expectMessage(StartEntity("1"))
+      shardingProbe.expectNoMessage(100.millis)
+      shardingProbe.expectMessage(StartEntity("2"))
+      shardingProbe.expectNoMessage(100.millis)
+      shardingProbe.expectMessage(StartEntity("3"))
+      shardingProbe.expectNoMessage(100.millis)
+      shardingProbe.expectMessage(StartEntity("1"))
+
+      testKit.stop(pinger)
+    }
+
+  }
+
   object TagProcessor {
     sealed trait Command
     def apply(tag: String): Behavior[Command] = Behaviors.setup { ctx =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to