Is this is a correct way to create snapshots and delete messages? I have
created an offset to retain 10 old entries (assuming that these jobs may
have been worked upon)
import akka.actor.{Props, ActorSystem, ActorLogging}
import akka.event.LoggingReceive
import akka.persistence._
case class Command(data: String)
case class Event(data: String)
case object InternalState
case object TakeSnapshot
case object ShutDown
case object Fail
case class State(queue: List[String] = Nil) {
def updated(event: Event): State = copy(event.data :: queue)
def size: Int = queue.length
override def toString: String = queue.reverse.toString
}
class PersistentSnapshotActor extends PersistentActor with ActorLogging {
override def persistenceId = "snapshot-persistence-id"
val eventDeleteOffset = 10 // this is to make sure we have some past
events
var state = State()
def updateState(event: Event) = state = state.updated(event)
def numberOfEvents = state.size
def receiveRecover = LoggingReceive {
case event: Event => updateState(event)
case SnapshotOffer(_, snapshot: State) =>
println(s"offered state: $snapshot")
state = snapshot
}
def receiveCommand = LoggingReceive {
case Command(data) => persist(Event(data))(updateState)
case Fail => throw new Exception("killing persistent actor.")
case ShutDown => context.stop(self)
case InternalState => println(state)
case TakeSnapshot => saveSnapshot(state)
case SaveSnapshotSuccess(metadata) =>
println(s"snapshot saved. seqNum:${metadata.sequenceNr},
timeStamp:${metadata.timestamp}")
deleteMessages(metadata.sequenceNr - eventDeleteOffset)
case SaveSnapshotFailure(_, reason) => println(s"failed to save
snapshot: $reason")
case DeleteMessagesSuccess(toSequenceNr) => println(s"message deleted
till sequenceNumber: $toSequenceNr")
case DeleteMessagesFailure(reason, toSequenceNr) => println(s"Error in
deleting message till sequenceNr $toSequenceNr: $reason")
}
}
object PersistentSnapshotActorApp extends App {
val system = ActorSystem("snapshotSystem")
val persistentActor = system.actorOf(Props[PersistentSnapshotActor],
"persistentSnapshotActor")
for (i <- 1 to 100) {
persistentActor ! Command(i.toString)
}
persistentActor ! InternalState
persistentActor ! TakeSnapshot
for (i <- 101 to 150) {
persistentActor ! Command(i.toString)
}
persistentActor ! TakeSnapshot
persistentActor ! Fail
persistentActor ! InternalState
persistentActor ! Fail
Thread.sleep(1000)
system.terminate()
}
when I run it, I get
List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58,
59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77,
78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96,
97, 98, 99, 100)
[ERROR] [11/27/2015 16:38:40.684]
[snapshotSystem-akka.actor.default-dispatcher-11]
[akka://snapshotSystem/user/persistentSnapshotActor] killing persistent
actor.
java.lang.Exception: killing persistent actor.
at
PersistentSnapshotActor$$anonfun$receiveCommand$1.applyOrElse(PersistentSnapshotActor.scala:44)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at
PersistentSnapshotActor.akka$persistence$Eventsourced$$super$aroundReceive(PersistentSnapshotActor.scala:25)
at
akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:599)
at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:158)
at PersistentSnapshotActor.aroundReceive(PersistentSnapshotActor.scala:25)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
offered state: List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54,
55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73,
74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92,
93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108,
109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123,
124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138,
139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150)
List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58,
59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77,
78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96,
97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111,
112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126,
127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141,
142, 143, 144, 145, 146, 147, 148, 149, 150)
[ERROR] [11/27/2015 16:38:40.698]
[snapshotSystem-akka.actor.default-dispatcher-8]
[akka://snapshotSystem/user/persistentSnapshotActor] killing persistent
actor.
java.lang.Exception: killing persistent actor.
at
PersistentSnapshotActor$$anonfun$receiveCommand$1.applyOrElse(PersistentSnapshotActor.scala:44)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at
PersistentSnapshotActor.akka$persistence$Eventsourced$$super$aroundReceive(PersistentSnapshotActor.scala:25)
at
akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:599)
at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:158)
at PersistentSnapshotActor.aroundReceive(PersistentSnapshotActor.scala:25)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
offered state: List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54,
55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73,
74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92,
93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108,
109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123,
124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138,
139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150)
snapshot saved. seqNum:100, timeStamp:1448671120594
snapshot saved. seqNum:150, timeStamp:1448671120681
message deleted till sequenceNumber: 90
message deleted till sequenceNumber: 140
The state still tells me that I have 150 events since on every
deleteMessage I am not updating up the state.
But I wanted to check if this is recommended way to do it
Thanks
+ Harit Himanshu
On Tuesday, February 18, 2014 at 12:39:28 PM UTC-8, David Pennell wrote:
>
> I would assume that a common scenario is to periodically snapshot state
> and keep 1 or more snapshots. In this case, I assume that as you delete
> older snapshots, you would also delete messages older than the snapshot
> from the journal in order to keep the journal from growing indefinitely.
>
> Is this a typical usage or am I missing something? I don't see anything
> in the code that automatically does this (and I didn't really expect to
> given the general flexibility theme of the framework). However, I don't
> see anything in the documentation that alludes to any other use of
> deleteMessage other than in dealing with exceptions.
>
> -david
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.