Persistent Actor receiving one message at a time. After persisting 100
messages, it should send those 100 messages to destination Actor. I
intend to delete those messages after saving a snapshot.
I am snapshotting after every 50 messages.I am deleting messages just after
receiving SavingSnapshotSuccess.But deletion doesn't happen. I still see
those messages in the logs.
Not receiving get either DeleteMessagesSuccess/Failure Message. Also not
receiving DeleteSnapshotsSuccess/Failure on deleting snapshots.
Following are the versions:
"com.typesafe.akka" %% "akka-actor" %
"2.4.0",
"com.typesafe.akka" %% "akka-cluster" %
"2.4.0",
"com.typesafe.akka" %% "akka-contrib" %
"2.3.14",
"com.typesafe.akka" %% "akka-slf4j" %
"2.4.0",
"com.typesafe.akka" %% "akka-testkit" %
"2.4.0" % "test",
"com.typesafe.akka" %% "akka-persistence" %
"2.4.0"
Code:
class Consumer extends PersistentActor with ActorLogging{
override def persistenceId = "bulkMessageConsumer-1"
var state = BulkState()
def updateState(event: Event): Unit = {
log.info("updating:"+event)
state = state.updated(event)
}
def numEvents =
state.size
val receiveRecover: Receive = {
case evt: Event => updateState(evt)
case SnapshotOffer(_, snapshot: BulkState) => state = snapshot
}
val receiveCommand: Receive = {
case Snapshoot ⇒
saveSnapshot(state)
case sss @ SaveSnapshotSuccess(metadata) ⇒
deleteMessages(toSequenceNr = metadata.sequenceNr - 1)
deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr =
metadata.sequenceNr - 1, maxTimestamp = metadata.timestamp - 1))
case SaveSnapshotFailure(metadata, reason) ⇒
log.error(reason, "Unable to save snapshot [metadata: {}]", metadata)
case c @ Command(data) =>
persistAsync(Event(c.data)) { event =>
basicChecks
updateState(event)
}
case DeleteSnapshotsSuccess =>
log.info("deleting snapshots")
case DeleteSnapshotsFailure =>
log.info("deleting snapshots failure")
case dms @ DeleteMessagesSuccess =>
log.info("messages deleted succesfully:")
case DeleteMessagesFailure =>
log.info("messages deletion failure")
case Print =>
println(state)
}
def send(): Unit ={
val framedMessage = MyMessage
DestinationActorRef ! framedMessage
remove()
}
def remove(): Unit ={
state = state.removeAll
}
def checkNumEventsReachBulkSize: Boolean ={
numEvents==100
}
def checkSnapshotBatch :Boolean ={
numEvents % 50 == 0 && numEvents >0
}
def basicChecks: Unit ={
if(checkSnapshotBatch){
self ! Snapshoot
}
if(checkNumEventsReachBulkSize){
send()
}
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.