Hi,
I stumbled upon behaviour of AtLeastOnceDelivery which I do not understand.
When my actor is started and I call "deliver" in "receiveRecover" to make
sure that my message will eventually be delivered at least once. Because of
an error the actor throws an exception on a next event. This leads to a few
restarts and then my actor stops. I expected that the AtLeastOnceDelivery
postpones sending messages until the actor is completely started. However,
on every restart the actor sends messages to the destination. After looking
through the persistence code I noticed that Recovery catches the exception
and keeps receiving all stored events (but not processing it in the actor).
When all stored events are received the actor sends the exception to itself
to throw it. But the AtLeastOnceDelivery has the following code:
override protected[akka] def aroundReceive(receive: Receive, message: Any):
Unit =
message match {
case ReplayMessagesSuccess ⇒
redeliverOverdue()
super.aroundReceive(receive, message)
//....
}
ReplayMessagesSuccess is received before the exception and leads to sending
all pending deliveries. On the next message the actor is restarted and
above process plays again.
Why does AtLeastOnceDelivery always send the pending deliveries when the
journal is Replayed successfully, even when replaying the actual message
lead to an exception leading to a restart or stop of the actor? Wouldn't it
make more sense to send a "RedeliveryTick" so self to start the redelivery?
This makes sure that delivery will only start when the actor is
successfully started.
I attached a test sample below to show the exact problem I'm facing.
Cheers,
Jeroen
import akka.actor._
import akka.actor.SupervisorStrategy.{ Stop, Escalate }
import akka.event.LoggingReceive
import akka.persistence.{ AtLeastOnceDelivery, PersistentActor }
import akka.testkit.{ TestKit, TestProbe, ImplicitSender }
import org.scalatest.WordSpecLike
class SuperVisor(testProbe: ActorRef) extends Actor {
import scala.concurrent.duration._
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10,
withinTimeRange = 10 seconds) {
case _: IllegalStateException ⇒ Stop
case t ⇒ super.supervisorStrategy.decider.
applyOrElse(t, (_: Any) ⇒ Escalate)
}
val crashingActor = context.actorOf(Props(new CrashingActor(testProbe)),
"CrashingActor")
def receive: Receive = LoggingReceive {
case msg ⇒ crashingActor forward msg
}
}
class WrappingActor(testProbe: ActorRef) extends Actor with ActorLogging {
def receive = LoggingReceive {
case msg ⇒
log.debug("RECEIVED --> " + msg)
testProbe forward msg
}
}
object CrashingActor {
case object Message
case object CrashMessage
case class SendingMessage(deliveryId: Long, recovering: Boolean)
}
class CrashingActor(testProbe: ActorRef) extends PersistentActor
with AtLeastOnceDelivery with ActorLogging {
import CrashingActor._
override def receive = LoggingReceive {
case x ⇒
log.warning("RECEIVED: " + x)
super.receive(x)
}
override def receiveRecover: Receive = LoggingReceive {
case Message ⇒ send()
case CrashMessage ⇒
log.error("Crash it")
throw new IllegalStateException("Intentionally crashed")
case x ⇒ log.warning("RECOVER MESSAGE: " + x)
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.error(reason, "RESTARTING " + message)
}
override def receiveCommand: Receive = LoggingReceive {
case Message ⇒ persist(Message)(_ ⇒ send())
case CrashMessage ⇒ persist(CrashMessage) { evt ⇒ }
}
def send() = {
log.debug("SENDING " + recoveryRunning)
deliver(testProbe.path, { id ⇒ SendingMessage(id, false) })
}
}
class CrashIssueSpec extends TestKit(ActorSystem("tst")) with WordSpecLike
with ImplicitSender {
"At least once delivery" should {
"not send on reboot" in {
val testProbe = TestProbe()
val testProbeWrapper = system.actorOf(Props(new WrappingActor(
testProbe.ref)), "testProbe")
val superVisor = system.actorOf(Props(new SuperVisor(testProbeWrapper
)), "supervisor")
superVisor ! CrashingActor.Message
testProbe.expectMsgType[CrashingActor.SendingMessage]
superVisor ! CrashingActor.CrashMessage
val deathProbe = TestProbe()
deathProbe.watch(superVisor)
superVisor ! PoisonPill
deathProbe.expectTerminated(superVisor)
testProbe.expectNoMsg()
system.actorOf(Props(new SuperVisor(testProbeWrapper)), "supervisor")
testProbe.expectNoMsg()
}
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.