This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d35cccb11 Improve DData remember entities logging and consistency 
(#2799)
1d35cccb11 is described below

commit 1d35cccb118d6124b716106e4941193dc9b98e58
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Mar 28 17:00:36 2026 +0800

    Improve DData remember entities logging and consistency (#2799)
    
    Multiple improvements to DData remember entities stores:
    
    - DDataRememberEntitiesShardStore: Retry reads up to 15 times (across
      all 5 keys) before giving up, logging at warning level initially and
      escalating to error only on final failure. Previously failed immediately
      on first GetFailure.
    - DDataRememberEntitiesCoordinatorStore: Already retried indefinitely,
      but now waits until 5 failures before escalating from warning to error
      level logging.
    - All log messages now clearly indicate whether they originate from the
      coordinator store or shard store (prefix: 'Remember entities
      coordinator/shard store').
    - Change consistency levels from ReadMajority/WriteMajority to
      ReadMajorityPlus/WriteMajorityPlus, using coordinatorStateReadMajorityPlus
      and coordinatorStateWriteMajorityPlus settings, with ReadAll/WriteAll
      fallback when configured as Int.MaxValue.
    
    Upstream: akka/akka-core@440fdfdcc2
    Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
    
    Co-authored-by: Copilot <[email protected]>
---
 .../DDataRememberEntitiesCoordinatorStore.scala    | 40 +++++++----
 .../internal/DDataRememberEntitiesShardStore.scala | 78 ++++++++++++++--------
 2 files changed, 79 insertions(+), 39 deletions(-)

diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesCoordinatorStore.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesCoordinatorStore.scala
index 5b23f90eba..ad47a66631 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesCoordinatorStore.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesCoordinatorStore.scala
@@ -22,8 +22,10 @@ import pekko.cluster.Cluster
 import pekko.cluster.ddata.GSet
 import pekko.cluster.ddata.GSetKey
 import pekko.cluster.ddata.Replicator
-import pekko.cluster.ddata.Replicator.ReadMajority
-import pekko.cluster.ddata.Replicator.WriteMajority
+import pekko.cluster.ddata.Replicator.ReadAll
+import pekko.cluster.ddata.Replicator.ReadMajorityPlus
+import pekko.cluster.ddata.Replicator.WriteAll
+import pekko.cluster.ddata.Replicator.WriteMajorityPlus
 import pekko.cluster.ddata.SelfUniqueAddress
 import pekko.cluster.sharding.ClusterShardingSettings
 import pekko.cluster.sharding.ShardRegion.ShardId
@@ -52,16 +54,23 @@ private[pekko] final class 
DDataRememberEntitiesCoordinatorStore(
   implicit val node: Cluster = Cluster(context.system)
   implicit val selfUniqueAddress: SelfUniqueAddress = 
SelfUniqueAddress(node.selfUniqueAddress)
 
-  private val readMajority = 
ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap)
-  private val writeMajority = 
WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap)
+  private val readConsistency = 
settings.tuningParameters.coordinatorStateReadMajorityPlus match {
+    case Int.MaxValue => 
ReadAll(settings.tuningParameters.waitingForStateTimeout)
+    case additional   => 
ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, additional, 
majorityMinCap)
+  }
+  private val writeConsistency = 
settings.tuningParameters.coordinatorStateWriteMajorityPlus match {
+    case Int.MaxValue => 
WriteAll(settings.tuningParameters.updatingStateTimeout)
+    case additional   => 
WriteMajorityPlus(settings.tuningParameters.updatingStateTimeout, additional, 
majorityMinCap)
+  }
 
   private val AllShardsKey = GSetKey[String](s"shard-$typeName-all")
+  private var retryGetCounter = 0
   private var allShards: Option[Set[ShardId]] = None
   private var coordinatorWaitingForShards: Option[ActorRef] = None
 
   // eager load of remembered shard ids
   def getAllShards(): Unit = {
-    replicator ! Replicator.Get(AllShardsKey, readMajority)
+    replicator ! Replicator.Get(AllShardsKey, readConsistency)
   }
   getAllShards()
 
@@ -84,31 +93,35 @@ private[pekko] final class 
DDataRememberEntitiesCoordinatorStore(
       onGotAllShards(Set.empty)
 
     case Replicator.GetFailure(AllShardsKey, _) =>
-      log.error(
-        "The ShardCoordinator was unable to get all shards state within 
'waiting-for-state-timeout': {} millis (retrying)",
-        readMajority.timeout.toMillis)
+      retryGetCounter += 1
+      val template =
+        "Remember entities coordinator store unable to get initial shards 
within 'waiting-for-state-timeout': {} millis (retrying)"
+      if (retryGetCounter < 5)
+        log.warning(template, readConsistency.timeout.toMillis)
+      else
+        log.error(template, readConsistency.timeout.toMillis)
       // repeat until GetSuccess
       getAllShards()
 
     case RememberEntitiesCoordinatorStore.AddShard(shardId) =>
-      replicator ! Replicator.Update(AllShardsKey, GSet.empty[String], 
writeMajority, Some((sender(), shardId)))(
+      replicator ! Replicator.Update(AllShardsKey, GSet.empty[String], 
writeConsistency, Some((sender(), shardId)))(
         _ + shardId)
 
     case Replicator.UpdateSuccess(AllShardsKey, Some((replyTo: ActorRef, 
shardId: ShardId))) =>
-      log.debug("The coordinator shards state was successfully updated with 
{}", shardId)
+      log.debug("Remember entities coordinator store shards successfully 
updated with {}", shardId)
       replyTo ! RememberEntitiesCoordinatorStore.UpdateDone(shardId)
 
     case Replicator.UpdateTimeout(AllShardsKey, Some((replyTo: ActorRef, 
shardId: ShardId))) =>
       log.error(
-        "The ShardCoordinator was unable to update shards distributed state 
within 'updating-state-timeout': {} millis (retrying), adding shard={}",
-        writeMajority.timeout.toMillis,
+        "Remember entities coordinator store unable to update shards state 
within 'updating-state-timeout': {} millis (retrying), adding shard={}",
+        writeConsistency.timeout.toMillis,
         shardId)
       replyTo ! RememberEntitiesCoordinatorStore.UpdateFailed(shardId)
 
     case Replicator.ModifyFailure(key, error, cause, Some((replyTo: ActorRef, 
shardId: ShardId))) =>
       log.error(
         cause,
-        "The remember entities store was unable to add shard [{}] (key [{}], 
failed with error: {})",
+        "Remember entities coordinator store was unable to add shard [{}] (key 
[{}], failed with error: {})",
         shardId,
         key,
         error)
@@ -116,6 +129,7 @@ private[pekko] final class 
DDataRememberEntitiesCoordinatorStore(
   }
 
   def onGotAllShards(shardIds: Set[ShardId]): Unit = {
+    retryGetCounter = 0
     coordinatorWaitingForShards match {
       case Some(coordinator) =>
         coordinator ! 
RememberEntitiesCoordinatorStore.RememberedShards(shardIds)
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
index afcdaf6c3c..61a488f255 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
@@ -31,13 +31,15 @@ import pekko.cluster.ddata.Replicator.GetFailure
 import pekko.cluster.ddata.Replicator.GetSuccess
 import pekko.cluster.ddata.Replicator.ModifyFailure
 import pekko.cluster.ddata.Replicator.NotFound
-import pekko.cluster.ddata.Replicator.ReadMajority
+import pekko.cluster.ddata.Replicator.ReadAll
+import pekko.cluster.ddata.Replicator.ReadMajorityPlus
 import pekko.cluster.ddata.Replicator.StoreFailure
 import pekko.cluster.ddata.Replicator.Update
 import pekko.cluster.ddata.Replicator.UpdateDataDeleted
 import pekko.cluster.ddata.Replicator.UpdateSuccess
 import pekko.cluster.ddata.Replicator.UpdateTimeout
-import pekko.cluster.ddata.Replicator.WriteMajority
+import pekko.cluster.ddata.Replicator.WriteAll
+import pekko.cluster.ddata.Replicator.WriteMajorityPlus
 import pekko.cluster.ddata.SelfUniqueAddress
 import pekko.cluster.sharding.ClusterShardingSettings
 import pekko.cluster.sharding.ShardRegion.EntityId
@@ -97,10 +99,18 @@ private[pekko] final class DDataRememberEntitiesShardStore(
   implicit val node: Cluster = Cluster(context.system)
   implicit val selfUniqueAddress: SelfUniqueAddress = 
SelfUniqueAddress(node.selfUniqueAddress)
 
-  private val readMajority = 
ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap)
+  private val readConsistency = 
settings.tuningParameters.coordinatorStateReadMajorityPlus match {
+    case Int.MaxValue => 
ReadAll(settings.tuningParameters.waitingForStateTimeout)
+    case additional   => 
ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, additional, 
majorityMinCap)
+  }
   // Note that the timeout is actually updatingStateTimeout / 4 so that we fit 
3 retries and a response in the timeout before the shard sees it as a failure
-  private val writeMajority = 
WriteMajority(settings.tuningParameters.updatingStateTimeout / 4, 
majorityMinCap)
+  private val writeConsistency = 
settings.tuningParameters.coordinatorStateWriteMajorityPlus match {
+    case Int.MaxValue => 
WriteAll(settings.tuningParameters.updatingStateTimeout / 4)
+    case additional   => 
WriteMajorityPlus(settings.tuningParameters.updatingStateTimeout / 4, 
additional, majorityMinCap)
+  }
   private val maxUpdateAttempts = 3
+  // Note: total for all 5 keys
+  private var maxReadAttemptsLeft = 15
   private val keys = stateKeys(typeName, shardId)
 
   if (log.isDebugEnabled) {
@@ -124,7 +134,8 @@ private[pekko] final class DDataRememberEntitiesShardStore(
   def idle: Receive = {
     case RememberEntitiesShardStore.GetEntities =>
       // not supported, but we may get several if the shard timed out and 
retried
-      log.debug("Another get entities request after responding to one, not 
expected/supported, ignoring")
+      log.debug(
+        "Remember entities shard store got another get entities request after 
responding to one, not expected/supported, ignoring")
     case update: RememberEntitiesShardStore.Update => onUpdate(update)
   }
 
@@ -156,28 +167,39 @@ private[pekko] final class 
DDataRememberEntitiesShardStore(
         receiveOne(i, ids)
       case NotFound(_, Some(i: Int)) =>
         receiveOne(i, Set.empty)
-      case GetFailure(key, _) =>
-        log.error(
-          "Unable to get an initial state within 'waiting-for-state-timeout': 
[{}] using [{}] (key [{}])",
-          readMajority.timeout.pretty,
-          readMajority,
-          key)
-        context.stop(self)
+      case GetFailure(key, Some(i)) =>
+        maxReadAttemptsLeft -= 1
+        if (maxReadAttemptsLeft > 0) {
+          log.warning(
+            "Remember entities shard store unable to get an initial state 
within 'waiting-for-state-timeout' for key [{}], retrying",
+            key)
+          replicator ! Get(key, readConsistency, Some(i))
+        } else {
+          log.error(
+            "Remember entities shard store unable to get an initial state 
within 'waiting-for-state-timeout' giving up after retrying: [{}] using [{}] 
(key [{}])",
+            readConsistency.timeout.pretty,
+            readConsistency,
+            key)
+          context.stop(self)
+        }
       case GetDataDeleted(_, _) =>
-        log.error("Unable to get an initial state because it was deleted")
+        log.error("Remember entities shard store unable to get an initial 
state because it was deleted")
         context.stop(self)
       case update: RememberEntitiesShardStore.Update =>
-        log.warning("Got an update before load of initial entities completed, 
dropping update: [{}]", update)
+        log.warning(
+          "Remember entities shard store got an update before load of initial 
entities completed, dropping update: [{}]",
+          update)
       case RememberEntitiesShardStore.GetEntities =>
         if (gotKeys.size == numberOfKeys) {
           // we already got all and was waiting for a request
-          log.debug("Got request from shard, sending remembered entities")
+          log.debug("Remember entities shard store got request from shard, 
sending remembered entities")
           sender() ! RememberEntitiesShardStore.RememberedEntities(ids)
           context.become(idle)
           unstashAll()
         } else {
           // we haven't seen all ids yet
-          log.debug("Got request from shard, waiting for all remembered 
entities to arrive")
+          log.debug(
+            "Remember entities shard store got request from shard, waiting for 
all remembered entities to arrive")
           context.become(waitingForAllEntityIds(gotKeys, ids, Some(sender())))
         }
       case _ =>
@@ -194,7 +216,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
       allEvts.groupBy(evt => key(evt.id)).map {
         case (key, evts) =>
           (evts,
-            (Update(key, ORSet.empty[EntityId], writeMajority, Some(evts)) { 
existing =>
+            (Update(key, ORSet.empty[EntityId], writeConsistency, Some(evts)) 
{ existing =>
                 evts.foldLeft(existing) {
                   case (acc, Started(id)) => acc :+ id
                   case (acc, Stopped(id)) => acc.remove(id)
@@ -218,7 +240,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
     // updatesLeft used both to keep track of what work remains and for 
retrying on timeout up to a limit
     def next(updatesLeft: Map[Set[Evt], (Update[ORSet[EntityId]], Int)]): 
Receive = {
       case UpdateSuccess(_, Some(evts: Set[Evt] @unchecked)) =>
-        log.debug("The DDataShard state was successfully updated for [{}]", 
evts)
+        log.debug("Remember entities shard store state was successfully 
updated for [{}]", evts)
         val remainingAfterThis = updatesLeft - evts
         if (remainingAfterThis.isEmpty) {
           requestor ! RememberEntitiesShardStore.UpdateDone(update.started, 
update.stopped)
@@ -230,31 +252,35 @@ private[pekko] final class 
DDataRememberEntitiesShardStore(
       case UpdateTimeout(_, Some(evts: Set[Evt] @unchecked)) =>
         val (updateForEvts, retriesLeft) = updatesLeft(evts)
         if (retriesLeft > 0) {
-          log.debug("Retrying update because of write timeout, tries left 
[{}]", retriesLeft)
+          log.debug(
+            "Remember entities shard store retrying update because of write 
timeout, tries left [{}]",
+            retriesLeft)
           replicator ! updateForEvts
           context.become(next(updatesLeft.updated(evts, (updateForEvts, 
retriesLeft - 1))))
         } else {
           log.error(
-            "Unable to update state, within 'updating-state-timeout'= [{}], 
gave up after [{}] retries",
-            writeMajority.timeout.pretty,
+            "Remember entities shard store unable to update state, within 
'updating-state-timeout'= [{}], gave up after [{}] retries",
+            writeConsistency.timeout.pretty,
             maxUpdateAttempts)
           // will trigger shard restart
           context.stop(self)
         }
       case StoreFailure(_, _) =>
-        log.error("Unable to update state, due to store failure")
+        log.error("Remember entities shard store unable to update state, due 
to store failure")
         // will trigger shard restart
         context.stop(self)
       case ModifyFailure(_, error, cause, _) =>
-        log.error(cause, "Unable to update state, due to modify failure: {}", 
error)
+        log.error(cause, "Remember entities shard store unable to update 
state, due to modify failure: {}", error)
         // will trigger shard restart
         context.stop(self)
       case UpdateDataDeleted(_, _) =>
-        log.error("Unable to update state, due to delete")
+        log.error("Remember entities shard store unable to update state, due 
to delete")
         // will trigger shard restart
         context.stop(self)
       case update: RememberEntitiesShardStore.Update =>
-        log.warning("Got a new update before write of previous completed, 
dropping update: [{}]", update)
+        log.warning(
+          "Remember entities shard store got a new update before write of 
previous completed, dropping update: [{}]",
+          update)
     }
 
     next(allUpdates)
@@ -263,7 +289,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
   private def loadAllEntities(): Unit = {
     (0 until numberOfKeys).toSet[Int].foreach { i =>
       val key = keys(i)
-      replicator ! Get(key, readMajority, Some(i))
+      replicator ! Get(key, readConsistency, Some(i))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to