This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new dacfd3f4a1 fix: keep stashBuffer when exception throw again #669 (#670)
dacfd3f4a1 is described below
commit dacfd3f4a168ed983fe5e76fd2fb428ac22d000e
Author: AndyChen <[email protected]>
AuthorDate: Wed Oct 11 15:57:38 2023 +0800
fix: keep stashBuffer when exception throw again #669 (#670)
---
.../apache/pekko/actor/typed/SupervisionSpec.scala | 99 +++++++++++++++++++++-
.../pekko/actor/typed/internal/Supervision.scala | 16 +++-
2 files changed, 109 insertions(+), 6 deletions(-)
diff --git
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala
index c5e8d5b8f1..f6288791f7 100644
---
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala
+++
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala
@@ -66,14 +66,15 @@ object SupervisionSpec {
def targetBehavior(
monitor: ActorRef[Event],
state: State = State(0, Map.empty),
- slowStop: Option[CountDownLatch] = None): Behavior[Command] =
+ slowStop: Option[CountDownLatch] = None,
+ slowRestart: Option[CountDownLatch] = None): Behavior[Command] =
receive[Command] { (context, cmd) =>
cmd match {
case Ping(n) =>
monitor ! Pong(n)
Behaviors.same
case IncrementState =>
- targetBehavior(monitor, state.copy(n = state.n + 1), slowStop)
+ targetBehavior(monitor, state.copy(n = state.n + 1), slowStop,
slowRestart)
case GetState =>
val reply = state.copy(children = context.children.map(c =>
c.path.name -> c.unsafeUpcast[Command]).toMap)
monitor ! reply
@@ -94,6 +95,8 @@ object SupervisionSpec {
case (_, sig) =>
if (sig == PostStop)
slowStop.foreach(latch => latch.await(10, TimeUnit.SECONDS))
+ else if (sig == PreRestart)
+ slowRestart.foreach(latch => latch.await(10, TimeUnit.SECONDS))
monitor ! ReceivedSignal(sig)
Behaviors.same
}
@@ -294,6 +297,49 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
}
}
+ class FailingConstructorTestSetupWithChild(failCount: Int, slowStopCount:
Int) {
+ val failCounter = new AtomicInteger(0)
+ val probe = TestProbe[Event]("evt")
+ val slowStop = new CountDownLatch(slowStopCount)
+
+ class FailingConstructor(context: ActorContext[Command], monitor:
ActorRef[Event])
+ extends AbstractBehavior[Command](context) {
+ monitor ! Started
+ context.spawn(targetBehavior(probe.ref, slowStop = Some(slowStop)),
nextName())
+ if (failCounter.getAndIncrement() < failCount) {
+ throw TestException("simulated exc from constructor")
+ }
+
+ override def onMessage(message: Command): Behavior[Command] = {
+ message match {
+ case Ping(i) =>
+ monitor ! Pong(i)
+ Behaviors.same
+ // ignore others.
+ case _ => Behaviors.same
+ }
+ }
+ }
+
+ def testMessageRetentionWhenStartException(strategy: SupervisorStrategy):
Unit = {
+ val behv = supervise(setup[Command](ctx => new FailingConstructor(ctx,
probe.ref)))
+ .onFailure[Exception](strategy)
+ val ref = spawn(behv)
+ probe.expectMessage(Started)
+ ref ! Ping(1)
+ ref ! Ping(2)
+ // unlock restart
+ slowStop.countDown()
+ probe.expectMessage(ReceivedSignal(PostStop))
+ probe.expectMessage(Started)
+ probe.expectMessage(ReceivedSignal(PostStop))
+ probe.expectMessage(Started)
+ // expect no message lost
+ probe.expectMessage(Pong(1))
+ probe.expectMessage(Pong(2))
+ }
+ }
+
class FailingDeferredTestSetup(failCount: Int, strategy: SupervisorStrategy)
{
val probe = TestProbe[AnyRef]("evt")
val failCounter = new AtomicInteger(0)
@@ -1133,6 +1179,55 @@ class SupervisionSpec extends
ScalaTestWithActorTestKit("""
}
}
+ "ensure unhandled message retention during unstash exception when restart"
in {
+
testMessageRetentionWhenMultipleException(SupervisorStrategy.restart.withStashCapacity(4))
+ }
+
+ "ensure unhandled message retention during unstash exception when backoff"
in {
+
testMessageRetentionWhenMultipleException(SupervisorStrategy.restartWithBackoff(10.millis,
10.millis,
+ 0).withStashCapacity(4))
+ }
+
+ def testMessageRetentionWhenMultipleException(strategy:
SupervisorStrategy): Unit = {
+ val probe = TestProbe[Event]("evt")
+ val slowRestart = new CountDownLatch(1)
+ val behv =
+ Behaviors.supervise(targetBehavior(probe.ref, slowRestart =
Some(slowRestart))).onFailure[Exc1](strategy)
+ val ref = spawn(behv)
+
+ // restart strategy require a latch in order to afford the opportunity
to stash messages
+ val childProbe = TestProbe[Event]("childEvt")
+ val childSlowStop = new CountDownLatch(1)
+ val childName = nextName()
+ ref ! CreateChild(targetBehavior(childProbe.ref, slowStop =
Some(childSlowStop)), childName)
+ ref ! GetState
+ probe.expectMessageType[State].children.keySet should ===(Set(childName))
+
+ ref ! Throw(new Exc1)
+ ref ! Throw(new Exc1)
+ ref ! Ping(1)
+ ref ! Ping(2)
+ // waiting for actor to restart, Pings will stashed
+ probe.expectNoMessage()
+ slowRestart.countDown()
+ probe.expectMessage(ReceivedSignal(PreRestart))
+ childSlowStop.countDown()
+ probe.expectMessage(ReceivedSignal(PreRestart))
+ probe.expectMessage(Pong(1))
+ probe.expectMessage(Pong(2))
+ }
+
+ "ensure stash message retention on start exception when restart" in new
FailingConstructorTestSetupWithChild(
+ failCount = 2, slowStopCount = 1) {
+
testMessageRetentionWhenStartException(SupervisorStrategy.restart.withStashCapacity(4).withLimit(4,
10.seconds))
+ }
+
+ "ensure stash message retention on start exception when backoff" in new
FailingConstructorTestSetupWithChild(
+ failCount = 2, slowStopCount = 1) {
+
testMessageRetentionWhenStartException(SupervisorStrategy.restartWithBackoff(10.millis,
10.millis,
+ 0).withStashCapacity(4))
+ }
+
"work with nested supervisions and defers" in {
val strategy = SupervisorStrategy.restart.withLimit(3, 1.second)
val probe = TestProbe[AnyRef]("p")
diff --git
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala
index 44ea630956..bbc4366cb2 100644
---
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala
+++
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala
@@ -365,9 +365,15 @@ private class RestartSupervisor[T, Thr <: Throwable:
ClassTag](initial: Behavior
val stashCapacity =
if (strategy.stashCapacity >= 0) strategy.stashCapacity
else ctx.asScala.system.settings.RestartStashCapacity
- restartingInProgress = OptionVal.Some(
- (StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]],
stashCapacity), childrenToStop))
-
+ // new generate only if first time or there has been reset to None
+ restartingInProgress = restartingInProgress match {
+ case OptionVal.Some((stashBuffer, _)) =>
+ // keep stashBuffer when exception throw again
+ OptionVal.Some((stashBuffer, childrenToStop))
+ case _ =>
+ OptionVal.Some(
+
(StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]],
stashCapacity), childrenToStop))
+ }
strategy match {
case backoff: Backoff =>
val restartDelay =
@@ -398,8 +404,10 @@ private class RestartSupervisor[T, Thr <: Throwable:
ClassTag](initial: Behavior
val newBehavior = Behavior.validateAsInitial(Behavior.start(initial,
ctx.asInstanceOf[TypedActorContext[T]]))
val nextBehavior = restartingInProgress match {
case OptionVal.Some((stashBuffer, _)) =>
+ val behavior = stashBuffer.unstashAll(newBehavior.unsafeCast)
+ // reset stash state if not more exception throw
restartingInProgress = OptionVal.None
- stashBuffer.unstashAll(newBehavior.unsafeCast)
+ behavior
case _ => newBehavior
}
nextBehavior.narrow
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]