Hi,

in other words :

def receive: Receive = {
  case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>

    // can it happen that another Request message comes before this partial 
function returns (while this one is being processed) ?

}


I have an asynchronous ActorProvider that is scanning ElasticSearch index, 
but I'm calling "await" at the end, so it is basically blocking :

private var lastScrollId: String = _


def receive: Receive = {
  case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
    def pushRecursively(n: Long, scrollId: String): Future[Option[String]] = {
      require(scrollId != null && scrollId.nonEmpty, "Scroll id must be 
present!")
      scroll(scrollId) flatMap {
        case (sid, recs) if recs.isEmpty => // empty hits means end of 
scanning/scrolling
          Future.successful(Option.empty)
        case (sid, recs) =>
          onNext(recs)
          if (n > 1)
            pushRecursively(n-1, sid)
          else
            Future.successful(Option(sid))
      }
    }

    val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId)
    f onComplete {
      case Failure(ex) =>
        log.error(ex, "Unexpected ScanSource error")
        onError(ex)
        context.stop(self)
      case Success(sidOpt) => sidOpt match {
        case None =>
          log.info("ScanSource just completed...")
          if (isCompleted)
            log.warning("ScanSource already completed, I cannot figure out why 
this occurs!")
          else {
            onComplete()
            context.stop(self)
          }
        case Some(sid) =>
          lastScrollId = sid
      }
    }
    f.await(600.seconds)

  case Cancel =>
    context.stop(self)
}


But as you can see, there is "log.warning" sayig that onComplete() was 
already called, which can happen only if ActorPublisher wasn't Requested 
sequentially.

I think this implementation is correct and valid even though it is blocking 
actor's dispatcher thread. But I really cannot figure out how it can be 
"completed" twice...

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to