Hi Jakub,

Starting to read your email I definitely thought there must be something
mysterious at work!

>From what I can tell, there are a couple of compounding things here:

1) future.onComplete will be executed on another thread than the actor, or
"concurrently with the actor", this means that you can't close over the
actor and call methods on it from another thread, see:
http://doc.akka.io/docs/akka/2.3.11/additional/faq.html

2) when you call `Await` on the Future, you're only going to await it
having a value, not await its callbacks to finish execute.
So:

1. val f = someFuture(…)
2. f.onComplete { … }
3. Await.result(f, …)

When line 3 executes, onComplete could have already executed, is
(con)currently being executed or will be executed.

Does that make sense?


On Mon, May 25, 2015 at 11:03 AM, Jakub Liska <liska.ja...@gmail.com> wrote:

> Hi,
>
> in other words :
>
> def receive: Receive = {
>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>
>     // can it happen that another Request message comes before this partial 
> function returns (while this one is being processed) ?
>
> }
>
>
> I have an asynchronous ActorProvider that is scanning ElasticSearch index,
> but I'm calling "await" at the end, so it is basically blocking :
>
> private var lastScrollId: String = _
>
>
> def receive: Receive = {
>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>     def pushRecursively(n: Long, scrollId: String): Future[Option[String]] = {
>       require(scrollId != null && scrollId.nonEmpty, "Scroll id must be 
> present!")
>       scroll(scrollId) flatMap {
>         case (sid, recs) if recs.isEmpty => // empty hits means end of 
> scanning/scrolling
>           Future.successful(Option.empty)
>         case (sid, recs) =>
>           onNext(recs)
>           if (n > 1)
>             pushRecursively(n-1, sid)
>           else
>             Future.successful(Option(sid))
>       }
>     }
>
>     val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId)
>     f onComplete {
>       case Failure(ex) =>
>         log.error(ex, "Unexpected ScanSource error")
>         onError(ex)
>         context.stop(self)
>       case Success(sidOpt) => sidOpt match {
>         case None =>
>           log.info("ScanSource just completed...")
>           if (isCompleted)
>             log.warning("ScanSource already completed, I cannot figure out 
> why this occurs!")
>           else {
>             onComplete()
>             context.stop(self)
>           }
>         case Some(sid) =>
>           lastScrollId = sid
>       }
>     }
>     f.await(600.seconds)
>
>   case Cancel =>
>     context.stop(self)
> }
>
>
> But as you can see, there is "log.warning" sayig that onComplete() was
> already called, which can happen only if ActorPublisher wasn't Requested
> sequentially.
>
> I think this implementation is correct and valid even though it is
> blocking actor's dispatcher thread. But I really cannot figure out how it
> can be "completed" twice...
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to