This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new dacfd3f4a1 fix: keep stashBuffer when exception throw again #669 (#670)
dacfd3f4a1 is described below

commit dacfd3f4a168ed983fe5e76fd2fb428ac22d000e
Author: AndyChen <[email protected]>
AuthorDate: Wed Oct 11 15:57:38 2023 +0800

    fix: keep stashBuffer when exception throw again #669 (#670)
---
 .../apache/pekko/actor/typed/SupervisionSpec.scala | 99 +++++++++++++++++++++-
 .../pekko/actor/typed/internal/Supervision.scala   | 16 +++-
 2 files changed, 109 insertions(+), 6 deletions(-)

diff --git 
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala
 
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala
index c5e8d5b8f1..f6288791f7 100644
--- 
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala
+++ 
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala
@@ -66,14 +66,15 @@ object SupervisionSpec {
   def targetBehavior(
       monitor: ActorRef[Event],
       state: State = State(0, Map.empty),
-      slowStop: Option[CountDownLatch] = None): Behavior[Command] =
+      slowStop: Option[CountDownLatch] = None,
+      slowRestart: Option[CountDownLatch] = None): Behavior[Command] =
     receive[Command] { (context, cmd) =>
       cmd match {
         case Ping(n) =>
           monitor ! Pong(n)
           Behaviors.same
         case IncrementState =>
-          targetBehavior(monitor, state.copy(n = state.n + 1), slowStop)
+          targetBehavior(monitor, state.copy(n = state.n + 1), slowStop, 
slowRestart)
         case GetState =>
           val reply = state.copy(children = context.children.map(c => 
c.path.name -> c.unsafeUpcast[Command]).toMap)
           monitor ! reply
@@ -94,6 +95,8 @@ object SupervisionSpec {
       case (_, sig) =>
         if (sig == PostStop)
           slowStop.foreach(latch => latch.await(10, TimeUnit.SECONDS))
+        else if (sig == PreRestart)
+          slowRestart.foreach(latch => latch.await(10, TimeUnit.SECONDS))
         monitor ! ReceivedSignal(sig)
         Behaviors.same
     }
@@ -294,6 +297,49 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
     }
   }
 
+  class FailingConstructorTestSetupWithChild(failCount: Int, slowStopCount: 
Int) {
+    val failCounter = new AtomicInteger(0)
+    val probe = TestProbe[Event]("evt")
+    val slowStop = new CountDownLatch(slowStopCount)
+
+    class FailingConstructor(context: ActorContext[Command], monitor: 
ActorRef[Event])
+        extends AbstractBehavior[Command](context) {
+      monitor ! Started
+      context.spawn(targetBehavior(probe.ref, slowStop = Some(slowStop)), 
nextName())
+      if (failCounter.getAndIncrement() < failCount) {
+        throw TestException("simulated exc from constructor")
+      }
+
+      override def onMessage(message: Command): Behavior[Command] = {
+        message match {
+          case Ping(i) =>
+            monitor ! Pong(i)
+            Behaviors.same
+          // ignore others.
+          case _ => Behaviors.same
+        }
+      }
+    }
+
+    def testMessageRetentionWhenStartException(strategy: SupervisorStrategy): 
Unit = {
+      val behv = supervise(setup[Command](ctx => new FailingConstructor(ctx, 
probe.ref)))
+        .onFailure[Exception](strategy)
+      val ref = spawn(behv)
+      probe.expectMessage(Started)
+      ref ! Ping(1)
+      ref ! Ping(2)
+      // unlock restart
+      slowStop.countDown()
+      probe.expectMessage(ReceivedSignal(PostStop))
+      probe.expectMessage(Started)
+      probe.expectMessage(ReceivedSignal(PostStop))
+      probe.expectMessage(Started)
+      // expect no message lost
+      probe.expectMessage(Pong(1))
+      probe.expectMessage(Pong(2))
+    }
+  }
+
   class FailingDeferredTestSetup(failCount: Int, strategy: SupervisorStrategy) 
{
     val probe = TestProbe[AnyRef]("evt")
     val failCounter = new AtomicInteger(0)
@@ -1133,6 +1179,55 @@ class SupervisionSpec extends 
ScalaTestWithActorTestKit("""
       }
     }
 
+    "ensure unhandled message retention during unstash exception when restart" 
in {
+      
testMessageRetentionWhenMultipleException(SupervisorStrategy.restart.withStashCapacity(4))
+    }
+
+    "ensure unhandled message retention during unstash exception when backoff" 
in {
+      
testMessageRetentionWhenMultipleException(SupervisorStrategy.restartWithBackoff(10.millis,
 10.millis,
+        0).withStashCapacity(4))
+    }
+
+    def testMessageRetentionWhenMultipleException(strategy: 
SupervisorStrategy): Unit = {
+      val probe = TestProbe[Event]("evt")
+      val slowRestart = new CountDownLatch(1)
+      val behv =
+        Behaviors.supervise(targetBehavior(probe.ref, slowRestart = 
Some(slowRestart))).onFailure[Exc1](strategy)
+      val ref = spawn(behv)
+
+      //  restart strategy require a latch in order to afford the opportunity 
to stash messages
+      val childProbe = TestProbe[Event]("childEvt")
+      val childSlowStop = new CountDownLatch(1)
+      val childName = nextName()
+      ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = 
Some(childSlowStop)), childName)
+      ref ! GetState
+      probe.expectMessageType[State].children.keySet should ===(Set(childName))
+
+      ref ! Throw(new Exc1)
+      ref ! Throw(new Exc1)
+      ref ! Ping(1)
+      ref ! Ping(2)
+      // waiting for actor to restart, Pings will stashed
+      probe.expectNoMessage()
+      slowRestart.countDown()
+      probe.expectMessage(ReceivedSignal(PreRestart))
+      childSlowStop.countDown()
+      probe.expectMessage(ReceivedSignal(PreRestart))
+      probe.expectMessage(Pong(1))
+      probe.expectMessage(Pong(2))
+    }
+
+    "ensure stash message retention on start exception when restart" in new 
FailingConstructorTestSetupWithChild(
+      failCount = 2, slowStopCount = 1) {
+      
testMessageRetentionWhenStartException(SupervisorStrategy.restart.withStashCapacity(4).withLimit(4,
 10.seconds))
+    }
+
+    "ensure stash message retention on start exception when backoff" in new 
FailingConstructorTestSetupWithChild(
+      failCount = 2, slowStopCount = 1) {
+      
testMessageRetentionWhenStartException(SupervisorStrategy.restartWithBackoff(10.millis,
 10.millis,
+        0).withStashCapacity(4))
+    }
+
     "work with nested supervisions and defers" in {
       val strategy = SupervisorStrategy.restart.withLimit(3, 1.second)
       val probe = TestProbe[AnyRef]("p")
diff --git 
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala
 
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala
index 44ea630956..bbc4366cb2 100644
--- 
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala
+++ 
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala
@@ -365,9 +365,15 @@ private class RestartSupervisor[T, Thr <: Throwable: 
ClassTag](initial: Behavior
     val stashCapacity =
       if (strategy.stashCapacity >= 0) strategy.stashCapacity
       else ctx.asScala.system.settings.RestartStashCapacity
-    restartingInProgress = OptionVal.Some(
-      (StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], 
stashCapacity), childrenToStop))
-
+    // new generate only if first time or there has been reset to None
+    restartingInProgress = restartingInProgress match {
+      case OptionVal.Some((stashBuffer, _)) =>
+        // keep stashBuffer when exception throw again
+        OptionVal.Some((stashBuffer, childrenToStop))
+      case _ =>
+        OptionVal.Some(
+          
(StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], 
stashCapacity), childrenToStop))
+    }
     strategy match {
       case backoff: Backoff =>
         val restartDelay =
@@ -398,8 +404,10 @@ private class RestartSupervisor[T, Thr <: Throwable: 
ClassTag](initial: Behavior
       val newBehavior = Behavior.validateAsInitial(Behavior.start(initial, 
ctx.asInstanceOf[TypedActorContext[T]]))
       val nextBehavior = restartingInProgress match {
         case OptionVal.Some((stashBuffer, _)) =>
+          val behavior = stashBuffer.unstashAll(newBehavior.unsafeCast)
+          // reset stash state if not more exception throw
           restartingInProgress = OptionVal.None
-          stashBuffer.unstashAll(newBehavior.unsafeCast)
+          behavior
         case _ => newBehavior
       }
       nextBehavior.narrow


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to