Hi,
in other words :
def receive: Receive = {
case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
// can it happen that another Request message comes before this partial
function returns (while this one is being processed) ?
}
I have an asynchronous ActorProvider that is scanning ElasticSearch index,
but I'm calling "await" at the end, so it is basically blocking :
private var lastScrollId: String = _
def receive: Receive = {
case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
def pushRecursively(n: Long, scrollId: String): Future[Option[String]] = {
require(scrollId != null && scrollId.nonEmpty, "Scroll id must be
present!")
scroll(scrollId) flatMap {
case (sid, recs) if recs.isEmpty => // empty hits means end of
scanning/scrolling
Future.successful(Option.empty)
case (sid, recs) =>
onNext(recs)
if (n > 1)
pushRecursively(n-1, sid)
else
Future.successful(Option(sid))
}
}
val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId)
f onComplete {
case Failure(ex) =>
log.error(ex, "Unexpected ScanSource error")
onError(ex)
context.stop(self)
case Success(sidOpt) => sidOpt match {
case None =>
log.info("ScanSource just completed...")
if (isCompleted)
log.warning("ScanSource already completed, I cannot figure out why
this occurs!")
else {
onComplete()
context.stop(self)
}
case Some(sid) =>
lastScrollId = sid
}
}
f.await(600.seconds)
case Cancel =>
context.stop(self)
}
But as you can see, there is "log.warning" sayig that onComplete() was
already called, which can happen only if ActorPublisher wasn't Requested
sequentially.
I think this implementation is correct and valid even though it is blocking
actor's dispatcher thread. But I really cannot figure out how it can be
"completed" twice...
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.