I am working on a PersistentFSM (currently akka 2.4.0-RC2) which keeps
track of when it was running. In order to do so, it records the time it was
initialized, and the time at which the last message was received.
The idea is that, upon restart of the actor, it has some information about
the uptime before the restart.
A problem is that the actor cannot modify its state in the
onRecoveryCompleted function, see the code below for a simple example
object MyPersistentFSM {
def props(): Props = Props(new MyPersistentFSM)
case object SomeCommand
case object CompleteInitialization
sealed trait MyState extends FSMState
case object Initializing extends MyState { val identifier = "Initializing"
}
case object Initialized extends MyState { val identifier = "Initialized" }
sealed trait MyEvent
case class ReceivedMsg(time: Long) extends MyEvent
case class InitializationCompleted(time: Long) extends MyEvent
case class UpTime(startTime: Long, endTime: Long)
case class MyData(previousUpTimes: Seq[UpTime], timeStarted: Option[Long],
timeOfLastMsg: Option[Long])
}
class MyPersistentFSM extends PersistentFSM[MyState, MyData, MyEvent] {
override def domainEventClassTag: ClassTag[MyEvent] = ClassTag(classOf[
MyEvent])
override def persistenceId: String = "myPersistenceId"
when(Initializing){
case Event(CompleteInitialization, data) =>
val time = System.currentTimeMillis()
goto(Initialized) applying InitializationCompleted(time)
}
when(Initialized){
case Event(SomeCommand, data) =>
stay() applying ReceivedMsg(System.currentTimeMillis())
}
startWith(Initializing, MyData(Seq.empty, None, None))
initialize()
override def onRecoveryCompleted(): Unit = {
// We were down, we should persist this as an event, to be able to
record the previous uptime
}
override def applyEvent(event: MyEvent, data: MyData): MyData = event
match {
case ReceivedMsg(time) => data.copy(timeOfLastMsg = Some(time))
case InitializationCompleted(time) => data.copy(timeStarted = Some(time
))
}
}
Some solutions I thought of:
1. Send an ActorRestarted message to self, and apply/persist this event
when handling this message
2. Change the state in the onRecoveryCompleted by using startWith(...)
and initialize() and create a snapshot to make sure it is eventually
persisted
3. Call persist from within onRecoveryCompleted (and change the state
using startWith(...) and initialize() )
The first could work, however if the actor fails and has a large number of
messages in the mailbox, the actorRestarted message will not be the first
message that is processed, this could cause problems.
The second possible solution has the problem that no event is persisted,
the snapshot attempts to solve this problem however snapshots are created
asynchonously, it is not a clean solution.
The third solution feels wrong to me, I do not think persist is meant to be
called within a persistentFSM, also, we cannot change the state/data in the
callback for the persist function, since goto() or stay() simply return a
state, which is only applied when returned in a StateFunction.
It would be nice if it was possible to persist an event in the
onRecoveryCompleted function, since it seems to me that there is no other
way to handle this in a PersistentFSM
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.