Persistent Actor receiving one message at a time. After persisting 100 
messages, it should send those 100 messages to destination Actor. I 
intend to delete those messages after saving a snapshot.

I am snapshotting after every 50 messages.I am deleting messages just after 
receiving SavingSnapshotSuccess.But deletion doesn't happen. I still see 
those messages in the logs.
Not receiving get either DeleteMessagesSuccess/Failure Message. Also not 
receiving DeleteSnapshotsSuccess/Failure on deleting snapshots.


Following are the versions:
"com.typesafe.akka"               %%   "akka-actor"                    % 
"2.4.0",
  "com.typesafe.akka"               %%   "akka-cluster"                  % 
"2.4.0",
  "com.typesafe.akka"               %%   "akka-contrib"                  % 
"2.3.14",
  "com.typesafe.akka"               %%   "akka-slf4j"                    % 
"2.4.0",
  "com.typesafe.akka"               %%   "akka-testkit"                  % 
"2.4.0"                  % "test",
  "com.typesafe.akka"               %%   "akka-persistence"              % 
"2.4.0"


Code:

class Consumer extends PersistentActor with ActorLogging{

  override def persistenceId = "bulkMessageConsumer-1"


  var state = BulkState()

  def updateState(event: Event): Unit = {
    log.info("updating:"+event)
    state = state.updated(event)
  }

  def numEvents =
    state.size

  val receiveRecover: Receive = {
    case evt: Event                                 => updateState(evt)
    case SnapshotOffer(_, snapshot: BulkState) => state = snapshot
  }

  val receiveCommand: Receive = {

    case Snapshoot ⇒
      saveSnapshot(state)

    case sss @ SaveSnapshotSuccess(metadata) ⇒
      deleteMessages(toSequenceNr = metadata.sequenceNr - 1)
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = 
metadata.sequenceNr - 1, maxTimestamp = metadata.timestamp - 1))

    case SaveSnapshotFailure(metadata, reason) ⇒
      log.error(reason, "Unable to save snapshot [metadata: {}]", metadata)

    case c @ Command(data) =>
      persistAsync(Event(c.data)) { event =>
        basicChecks
        updateState(event)
      }

    case DeleteSnapshotsSuccess =>
       log.info("deleting snapshots")

    case DeleteSnapshotsFailure =>
      log.info("deleting snapshots failure")

    case dms @ DeleteMessagesSuccess =>
      log.info("messages deleted succesfully:")

    case DeleteMessagesFailure =>
      log.info("messages deletion failure")

    case Print =>
      println(state)
  }

  def send(): Unit ={
    val framedMessage = MyMessage
    DestinationActorRef ! framedMessage
    remove()
  }

  def remove(): Unit ={
    state = state.removeAll
  }

  def checkNumEventsReachBulkSize: Boolean ={
    numEvents==100
  }

  def checkSnapshotBatch :Boolean ={
    numEvents % 50 == 0 && numEvents >0
  }

  def basicChecks: Unit ={
    if(checkSnapshotBatch){
      self ! Snapshoot
    }

    if(checkNumEventsReachBulkSize){
      send()
    }
  }

}


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to