This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 550ea4bde5 Disable ClusterShardingHealthCheck after configured 
duration post member-up (#2785)
550ea4bde5 is described below

commit 550ea4bde5d95d55489aab899feff92db6d199d7
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 24 08:59:40 2026 +0100

    Disable ClusterShardingHealthCheck after configured duration post member-up 
(#2785)
    
    * Initial plan
    
    * Copy akka/akka-core#31864: Disable ClusterShardingHealthCheck after 
duration when member is up
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/bc72a2b0-c6d6-4bc2-983d-5d488ff934f4
    
    * Update ClusterShardingHealthCheck.scala
    
    * Update ClusterShardingHealthCheck.scala
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 cluster-sharding/src/main/resources/reference.conf | 11 ++++
 .../sharding/ClusterShardingHealthCheck.scala      | 33 +++++++++--
 .../sharding/ClusterShardingHealthCheckSpec.scala  | 64 +++++++++++++++++++---
 docs/src/main/paradox/typed/cluster-sharding.md    |  2 +
 4 files changed, 95 insertions(+), 15 deletions(-)

diff --git a/cluster-sharding/src/main/resources/reference.conf 
b/cluster-sharding/src/main/resources/reference.conf
index 5282a7122e..b21f7f2fe6 100644
--- a/cluster-sharding/src/main/resources/reference.conf
+++ b/cluster-sharding/src/main/resources/reference.conf
@@ -444,6 +444,17 @@ pekko.cluster.sharding {
     # Timeout for the local shard region to respond. This should be lower than 
your monitoring system's
     # timeout for health checks
     timeout = 5s
+
+    # The health check is only performed during this duration after
+    # the member is up. After that the sharding check will not be performed 
(always returns success).
+    # The purpose is to wait for Cluster Sharding registration to complete on 
initial startup.
+    # After that, in case of Sharding Coordinator movement or reachability we 
still want to be ready
+    # because requests can typically be served without involving the 
coordinator.
+    # Another reason is that when a new entity type is added in a rolling 
update we don't want to fail
+    # the ready check forever, which would stall the rolling update. Sharding 
Coordinator is expected
+    # run on the oldest member, but in this scenario that is in the old 
deployment hasn't started the
+    # coordinator for that entity type.
+    disabled-after = 10s
   }
 }
 # //#sharding-ext-config
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheck.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheck.scala
index 7b444887ca..2f1d75ea74 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheck.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheck.scala
@@ -15,15 +15,16 @@ package org.apache.pekko.cluster.sharding
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.{ DurationInt, FiniteDuration }
 import scala.jdk.CollectionConverters._
 import scala.jdk.DurationConverters._
 
 import org.apache.pekko
 import pekko.actor.ActorRef
 import pekko.actor.ActorSystem
-import pekko.annotation.ApiMayChange
 import pekko.annotation.InternalApi
+import pekko.cluster.Cluster
+import pekko.cluster.MemberStatus
 import pekko.event.Logging
 import pekko.pattern.AskTimeoutException
 import pekko.pattern.ask
@@ -39,11 +40,19 @@ private[pekko] object ClusterShardingHealthCheckSettings {
   def apply(config: Config): ClusterShardingHealthCheckSettings =
     new ClusterShardingHealthCheckSettings(
       config.getStringList("names").asScala.toSet,
-      config.getDuration("timeout").toScala)
+      config.getDuration("timeout").toScala,
+      config.getDuration("disabled-after").toScala)
 }
 
-@ApiMayChange
-final class ClusterShardingHealthCheckSettings(val names: Set[String], val 
timeout: FiniteDuration)
+final class ClusterShardingHealthCheckSettings(
+    val names: Set[String],
+    val timeout: FiniteDuration,
+    val disableAfter: FiniteDuration) {
+
+  // for binary backwards compatibility
+  @deprecated("Use full constructor", "2.0.0")
+  def this(names: Set[String], timeout: FiniteDuration) = this(names, timeout, 
10.seconds)
+}
 
 private object ClusterShardingHealthCheck {
   val Success = Future.successful(true)
@@ -52,7 +61,6 @@ private object ClusterShardingHealthCheck {
 /**
  * INTERNAL API (ctr)
  */
-@ApiMayChange
 final class ClusterShardingHealthCheck private[pekko] (
     system: ActorSystem,
     settings: ClusterShardingHealthCheckSettings,
@@ -72,11 +80,24 @@ final class ClusterShardingHealthCheck private[pekko] (
 
   // Once the check has passed it always does
   @volatile private var registered = false
+  @volatile private var startedTimestamp = 0L
+
+  private def isMemberUp(): Boolean = {
+    val memberStatus = Cluster(system).selfMember.status
+    memberStatus != MemberStatus.Joining && memberStatus != 
MemberStatus.Removed
+  }
 
   override def apply(): Future[Boolean] = {
     if (settings.names.isEmpty || registered) {
       ClusterShardingHealthCheck.Success
+    } else if (startedTimestamp != 0L &&
+      System
+        .currentTimeMillis() > startedTimestamp + 
settings.disableAfter.toMillis) {
+      ClusterShardingHealthCheck.Success
     } else {
+      if (startedTimestamp == 0 && isMemberUp())
+        startedTimestamp = System.currentTimeMillis()
+
       Future
         .traverse(settings.names) { name =>
           shardRegion(name) // this can throw if shard region not registered 
and it'll fail the check
diff --git 
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheckSpec.scala
 
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheckSpec.scala
index 56cf7aca0e..99132427da 100644
--- 
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheckSpec.scala
+++ 
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheckSpec.scala
@@ -16,6 +16,8 @@ package org.apache.pekko.cluster.sharding
 import scala.concurrent.duration._
 
 import org.apache.pekko
+import pekko.cluster.Cluster
+import pekko.cluster.MemberStatus
 import pekko.testkit.PekkoSpec
 import pekko.testkit.TestProbe
 import pekko.testkit.WithLogCapturing
@@ -26,9 +28,11 @@ import com.typesafe.config.ConfigFactory
 
 object ClusterShardingHealthCheckSpec {
   val config = ConfigFactory.parseString("""
-          pekko.loglevel = DEBUG
-          pekko.loggers = 
["org.apache.pekko.testkit.SilenceAllTestEventListener"]
-            """.stripMargin)
+    pekko.loglevel = DEBUG
+    pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"]
+    pekko.actor.provider = cluster
+    pekko.remote.artery.canonical.port = 0
+    """)
 }
 
 class ClusterShardingHealthCheckSpec
@@ -41,7 +45,7 @@ class ClusterShardingHealthCheckSpec
       val shardRegionProbe = TestProbe()
       val check = new ClusterShardingHealthCheck(
         system,
-        new ClusterShardingHealthCheckSettings(Set.empty, 1.second),
+        new ClusterShardingHealthCheckSettings(Set.empty, 1.second, 
10.seconds),
         _ => shardRegionProbe.ref)
       check().futureValue shouldEqual true
     }
@@ -49,7 +53,7 @@ class ClusterShardingHealthCheckSpec
       val shardRegionProbe = TestProbe()
       val check = new ClusterShardingHealthCheck(
         system,
-        new ClusterShardingHealthCheckSettings(Set("cat"), 1.second),
+        new ClusterShardingHealthCheckSettings(Set("cat"), 1.second, 
10.seconds),
         _ => shardRegionProbe.ref)
       val response = check()
       shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -60,7 +64,7 @@ class ClusterShardingHealthCheckSpec
       val shardRegionProbe = TestProbe()
       val check = new ClusterShardingHealthCheck(
         system,
-        new ClusterShardingHealthCheckSettings(Set("cat"), 1.second),
+        new ClusterShardingHealthCheckSettings(Set("cat"), 1.second, 
10.seconds),
         _ => shardRegionProbe.ref)
       val response = check()
       shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -71,7 +75,7 @@ class ClusterShardingHealthCheckSpec
       val shardRegionProbe = TestProbe()
       val check = new ClusterShardingHealthCheck(
         system,
-        new ClusterShardingHealthCheckSettings(Set("cat", "dog"), 1.second),
+        new ClusterShardingHealthCheckSettings(Set("cat", "dog"), 1.second, 
10.seconds),
         _ => shardRegionProbe.ref)
       val response = check()
       shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -84,7 +88,7 @@ class ClusterShardingHealthCheckSpec
       val shardRegionProbe = TestProbe()
       val check = new ClusterShardingHealthCheck(
         system,
-        new ClusterShardingHealthCheckSettings(Set("cat"), 100.millis),
+        new ClusterShardingHealthCheckSettings(Set("cat"), 100.millis, 
10.seconds),
         _ => shardRegionProbe.ref)
       val response = check()
       shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -95,7 +99,7 @@ class ClusterShardingHealthCheckSpec
       val shardRegionProbe = TestProbe()
       val check = new ClusterShardingHealthCheck(
         system,
-        new ClusterShardingHealthCheckSettings(Set("cat"), 1.second),
+        new ClusterShardingHealthCheckSettings(Set("cat"), 1.second, 
10.seconds),
         _ => shardRegionProbe.ref)
       val response = check()
       shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -106,6 +110,48 @@ class ClusterShardingHealthCheckSpec
       shardRegionProbe.expectNoMessage()
       secondResponse.futureValue shouldEqual true
     }
+
+    "always pass after disabled-after" in {
+      val shardRegionProbe = TestProbe()
+      val disabledAfter = 100.millis
+      val check = new ClusterShardingHealthCheck(
+        system,
+        new ClusterShardingHealthCheckSettings(Set("cat"), 1.second, 
disabledAfter),
+        _ => shardRegionProbe.ref)
+      // first check will always be performed
+      val response1 = check()
+      shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
+      shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", false))
+      response1.futureValue shouldEqual false
+
+      Thread.sleep(disabledAfter.toMillis + 100)
+
+      // and it will not start the clock until member up
+      val response2 = check()
+      shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
+      shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", false))
+      response2.futureValue shouldEqual false
+
+      Thread.sleep(disabledAfter.toMillis + 100)
+
+      Cluster(system).join(Cluster(system).selfAddress)
+      awaitAssert {
+        Cluster(system).selfMember.status shouldEqual MemberStatus.Up
+      }
+
+      // first check after member up will trigger start of clock
+      val response3 = check()
+      shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
+      shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", false))
+      response3.futureValue shouldEqual false
+
+      Thread.sleep(disabledAfter.toMillis + 100)
+
+      // and now it has exceeded the disabled-after duration
+      val response4 = check()
+      shardRegionProbe.expectNoMessage()
+      response4.futureValue shouldEqual true
+    }
   }
 
 }
diff --git a/docs/src/main/paradox/typed/cluster-sharding.md 
b/docs/src/main/paradox/typed/cluster-sharding.md
index 09a36001e5..7a442e1cef 100644
--- a/docs/src/main/paradox/typed/cluster-sharding.md
+++ b/docs/src/main/paradox/typed/cluster-sharding.md
@@ -728,6 +728,8 @@ Monitoring of each shard region is off by default. Add them 
by defining the enti
 pekko.cluster.sharding.healthcheck.names = ["counter-1", "HelloWorld"]
 ```
 
+The health check is disabled (always returns success) after a duration of 
failing checks after the Cluster member is up. Otherwise, it would stall a 
Kubernetes rolling update when adding a new entity type in the new version.
+
 See also additional information about how to make @ref:[smooth rolling 
updates](../additional/rolling-updates.md#cluster-sharding).
 
 ## Inspecting cluster sharding state


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to