Hi,

I stumbled upon behaviour of AtLeastOnceDelivery which I do not understand. 
When my actor is started and I call "deliver" in "receiveRecover" to make 
sure that my message will eventually be delivered at least once. Because of 
an error the actor throws an exception on a next event. This leads to a few 
restarts and then my actor stops. I expected that the AtLeastOnceDelivery 
postpones sending messages until the actor is completely started. However, 
on every restart the actor sends messages to the destination. After looking 
through the persistence code I noticed that Recovery catches the exception 
and keeps receiving all stored events (but not processing it in the actor). 
When all stored events are received the actor sends the exception to itself 
to throw it. But the AtLeastOnceDelivery has the following code:
 
 override protected[akka] def aroundReceive(receive: Receive, message: Any): 
Unit =
    message match {
      case ReplayMessagesSuccess ⇒
        redeliverOverdue()
        super.aroundReceive(receive, message)
      //....
    }

ReplayMessagesSuccess is received before the exception and leads to sending 
all pending deliveries. On the next message the actor is restarted and 
above process plays again.

Why does AtLeastOnceDelivery always send the pending deliveries when the 
journal is Replayed successfully, even when replaying the actual message 
lead to an exception leading to a restart or stop of the actor? Wouldn't it 
make more sense to send a "RedeliveryTick" so self to start the redelivery? 
This makes sure that delivery will only start when the actor is 
successfully started. 

I attached a test sample below to show the exact problem I'm facing.

Cheers,
Jeroen


import akka.actor._
import akka.actor.SupervisorStrategy.{ Stop, Escalate }
import akka.event.LoggingReceive
import akka.persistence.{ AtLeastOnceDelivery, PersistentActor }
import akka.testkit.{ TestKit, TestProbe, ImplicitSender }
import org.scalatest.WordSpecLike

class SuperVisor(testProbe: ActorRef) extends Actor {
  import scala.concurrent.duration._
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, 
withinTimeRange = 10 seconds) {
    case _: IllegalStateException ⇒ Stop
    case t                        ⇒ super.supervisorStrategy.decider.
applyOrElse(t, (_: Any) ⇒ Escalate)
  }

  val crashingActor = context.actorOf(Props(new CrashingActor(testProbe)), 
"CrashingActor")

  def receive: Receive = LoggingReceive {
    case msg ⇒ crashingActor forward msg
  }
}

class WrappingActor(testProbe: ActorRef) extends Actor with ActorLogging {
  def receive = LoggingReceive {
    case msg ⇒
      log.debug("RECEIVED --> " + msg)
      testProbe forward msg
  }
}

object CrashingActor {
  case object Message
  case object CrashMessage
  case class SendingMessage(deliveryId: Long, recovering: Boolean)
}

class CrashingActor(testProbe: ActorRef) extends PersistentActor
    with AtLeastOnceDelivery with ActorLogging {
  import CrashingActor._

  override def receive = LoggingReceive {
    case x ⇒
      log.warning("RECEIVED: " + x)
      super.receive(x)
  }

  override def receiveRecover: Receive = LoggingReceive {
    case Message ⇒ send()
    case CrashMessage ⇒
      log.error("Crash it")
      throw new IllegalStateException("Intentionally crashed")
    case x ⇒ log.warning("RECOVER MESSAGE: " + x)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.error(reason, "RESTARTING " + message)
  }

  override def receiveCommand: Receive = LoggingReceive {
    case Message      ⇒ persist(Message)(_ ⇒ send())
    case CrashMessage ⇒ persist(CrashMessage) { evt ⇒ }
  }

  def send() = {
    log.debug("SENDING " + recoveryRunning)
    deliver(testProbe.path, { id ⇒ SendingMessage(id, false) })
  }
}

class CrashIssueSpec extends TestKit(ActorSystem("tst")) with WordSpecLike 
with ImplicitSender {
  "At least once delivery" should {
    "not send on reboot" in {
      val testProbe = TestProbe()
      val testProbeWrapper = system.actorOf(Props(new WrappingActor(
testProbe.ref)), "testProbe")
      val superVisor = system.actorOf(Props(new SuperVisor(testProbeWrapper
)), "supervisor")
      superVisor ! CrashingActor.Message
      testProbe.expectMsgType[CrashingActor.SendingMessage]
      superVisor ! CrashingActor.CrashMessage
      val deathProbe = TestProbe()
      deathProbe.watch(superVisor)
      superVisor ! PoisonPill
      deathProbe.expectTerminated(superVisor)
      testProbe.expectNoMsg()
      system.actorOf(Props(new SuperVisor(testProbeWrapper)), "supervisor")
      testProbe.expectNoMsg()
    }
  }
}





-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to