Is this is a correct way to create snapshots and delete messages? I have 
created an offset to retain 10 old entries (assuming that these jobs may 
have been worked upon)

import akka.actor.{Props, ActorSystem, ActorLogging}
import akka.event.LoggingReceive
import akka.persistence._

case class Command(data: String)

case class Event(data: String)

case object InternalState

case object TakeSnapshot

case object ShutDown

case object Fail

case class State(queue: List[String] = Nil) {
  def updated(event: Event): State = copy(event.data :: queue)

  def size: Int = queue.length

  override def toString: String = queue.reverse.toString
}

class PersistentSnapshotActor extends PersistentActor with ActorLogging {
  override def persistenceId = "snapshot-persistence-id"

  val eventDeleteOffset = 10 // this is to make sure we have some past 
events
  var state = State()

  def updateState(event: Event) = state = state.updated(event)

  def numberOfEvents = state.size

  def receiveRecover = LoggingReceive {
    case event: Event => updateState(event)
    case SnapshotOffer(_, snapshot: State) =>
      println(s"offered state: $snapshot")
      state = snapshot
  }

  def receiveCommand = LoggingReceive {
    case Command(data) => persist(Event(data))(updateState)
    case Fail => throw new Exception("killing persistent actor.")
    case ShutDown => context.stop(self)
    case InternalState => println(state)
    case TakeSnapshot => saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>
      println(s"snapshot saved. seqNum:${metadata.sequenceNr}, 
timeStamp:${metadata.timestamp}")
      deleteMessages(metadata.sequenceNr - eventDeleteOffset)

    case SaveSnapshotFailure(_, reason) => println(s"failed to save 
snapshot: $reason")
    case DeleteMessagesSuccess(toSequenceNr) => println(s"message deleted 
till sequenceNumber: $toSequenceNr")
    case DeleteMessagesFailure(reason, toSequenceNr) => println(s"Error in 
deleting message till sequenceNr $toSequenceNr: $reason")
  }
}

object PersistentSnapshotActorApp extends App {
  val system = ActorSystem("snapshotSystem")
  val persistentActor = system.actorOf(Props[PersistentSnapshotActor], 
"persistentSnapshotActor")

  for (i <- 1 to 100) {
    persistentActor ! Command(i.toString)
  }
  persistentActor ! InternalState
  persistentActor ! TakeSnapshot
  for (i <- 101 to 150) {
    persistentActor ! Command(i.toString)
  }
  persistentActor ! TakeSnapshot
  persistentActor ! Fail
  persistentActor ! InternalState
  persistentActor ! Fail
  Thread.sleep(1000)
  system.terminate()
}

when I run it, I get

List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 
40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 
59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 
78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 
97, 98, 99, 100)
[ERROR] [11/27/2015 16:38:40.684] 
[snapshotSystem-akka.actor.default-dispatcher-11] 
[akka://snapshotSystem/user/persistentSnapshotActor] killing persistent 
actor.
java.lang.Exception: killing persistent actor.
at 
PersistentSnapshotActor$$anonfun$receiveCommand$1.applyOrElse(PersistentSnapshotActor.scala:44)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at 
PersistentSnapshotActor.akka$persistence$Eventsourced$$super$aroundReceive(PersistentSnapshotActor.scala:25)
at 
akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:599)
at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:158)
at PersistentSnapshotActor.aroundReceive(PersistentSnapshotActor.scala:25)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

offered state: List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 
55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 
74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 
93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 
109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 
124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 
139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150)
List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 
40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 
59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 
78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 
97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 
112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 
127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 
142, 143, 144, 145, 146, 147, 148, 149, 150)
[ERROR] [11/27/2015 16:38:40.698] 
[snapshotSystem-akka.actor.default-dispatcher-8] 
[akka://snapshotSystem/user/persistentSnapshotActor] killing persistent 
actor.
java.lang.Exception: killing persistent actor.
at 
PersistentSnapshotActor$$anonfun$receiveCommand$1.applyOrElse(PersistentSnapshotActor.scala:44)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at 
PersistentSnapshotActor.akka$persistence$Eventsourced$$super$aroundReceive(PersistentSnapshotActor.scala:25)
at 
akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:599)
at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:158)
at PersistentSnapshotActor.aroundReceive(PersistentSnapshotActor.scala:25)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

offered state: List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 
55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 
74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 
93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 
109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 
124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 
139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150)
snapshot saved. seqNum:100, timeStamp:1448671120594
snapshot saved. seqNum:150, timeStamp:1448671120681
message deleted till sequenceNumber: 90
message deleted till sequenceNumber: 140

The state still tells me that I have 150 events since on every 
deleteMessage I am not updating up the state.

But I wanted to check if this is recommended way to do it

Thanks
+ Harit Himanshu

On Tuesday, February 18, 2014 at 12:39:28 PM UTC-8, David Pennell wrote:
>
> I would assume that a common scenario is to periodically snapshot state 
> and keep 1 or more snapshots.  In this case, I assume that as you delete 
> older snapshots, you would also delete messages older than the snapshot 
> from the journal in order to keep the journal from growing indefinitely.
>
> Is this a typical usage or am I missing something?  I don't see anything 
> in the code that automatically does this (and I didn't really expect to 
> given the general flexibility theme of the framework).  However, I don't 
> see anything in the documentation that alludes to any other use of 
> deleteMessage other than in dealing with exceptions.
>
> -david
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to